transport.rs

  1use anyhow::{Context as _, Result, anyhow, bail};
  2#[cfg(any(test, feature = "test-support"))]
  3use async_pipe::{PipeReader, PipeWriter};
  4use dap_types::{
  5    ErrorResponse,
  6    messages::{Message, Response},
  7};
  8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
 10use parking_lot::Mutex;
 11use proto::ErrorExt;
 12use settings::Settings as _;
 13use smallvec::SmallVec;
 14use smol::{
 15    channel::{Receiver, Sender, unbounded},
 16    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 17    net::{TcpListener, TcpStream},
 18};
 19use std::{
 20    collections::HashMap,
 21    net::{Ipv4Addr, SocketAddrV4},
 22    process::Stdio,
 23    sync::Arc,
 24    time::Duration,
 25};
 26use task::TcpArgumentsTemplate;
 27use util::ConnectionResult;
 28
 29use crate::{
 30    adapters::{DebugAdapterBinary, TcpArguments},
 31    client::DapMessageHandler,
 32    debugger_settings::DebuggerSettings,
 33};
 34
 35pub(crate) type IoMessage = str;
 36pub(crate) type Command = str;
 37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
 38
 39#[derive(PartialEq, Eq, Clone, Copy)]
 40pub enum LogKind {
 41    Adapter,
 42    Rpc,
 43}
 44
 45#[derive(Clone, Copy)]
 46pub enum IoKind {
 47    StdIn,
 48    StdOut,
 49    StdErr,
 50}
 51
 52type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 53type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 54
 55pub trait Transport: Send + Sync {
 56    fn has_adapter_logs(&self) -> bool;
 57    fn tcp_arguments(&self) -> Option<TcpArguments>;
 58    fn connect(
 59        &mut self,
 60    ) -> Task<
 61        Result<(
 62            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 63            Box<dyn AsyncRead + Unpin + Send + 'static>,
 64        )>,
 65    >;
 66    fn kill(&mut self);
 67    #[cfg(any(test, feature = "test-support"))]
 68    fn as_fake(&self) -> &FakeTransport {
 69        unreachable!()
 70    }
 71}
 72
 73async fn start(
 74    binary: &DebugAdapterBinary,
 75    log_handlers: LogHandlers,
 76    cx: &mut AsyncApp,
 77) -> Result<Box<dyn Transport>> {
 78    #[cfg(any(test, feature = "test-support"))]
 79    if cfg!(any(test, feature = "test-support")) {
 80        return Ok(Box::new(FakeTransport::start(cx).await?));
 81    }
 82
 83    if binary.connection.is_some() {
 84        Ok(Box::new(
 85            TcpTransport::start(binary, log_handlers, cx).await?,
 86        ))
 87    } else {
 88        Ok(Box::new(
 89            StdioTransport::start(binary, log_handlers, cx).await?,
 90        ))
 91    }
 92}
 93
 94pub(crate) struct TransportDelegate {
 95    log_handlers: LogHandlers,
 96    pub(crate) pending_requests: Requests,
 97    pub(crate) transport: Mutex<Box<dyn Transport>>,
 98    pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
 99    tasks: Mutex<Vec<Task<()>>>,
100}
101
102impl Drop for TransportDelegate {
103    fn drop(&mut self) {
104        self.transport.lock().kill()
105    }
106}
107
108impl TransportDelegate {
109    pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
110        let log_handlers: LogHandlers = Default::default();
111        let transport = start(binary, log_handlers.clone(), cx).await?;
112        Ok(Self {
113            transport: Mutex::new(transport),
114            log_handlers,
115            server_tx: Default::default(),
116            pending_requests: Default::default(),
117            tasks: Default::default(),
118        })
119    }
120
121    pub async fn connect(
122        &self,
123        message_handler: DapMessageHandler,
124        cx: &mut AsyncApp,
125    ) -> Result<()> {
126        let (server_tx, client_rx) = unbounded::<Message>();
127        self.tasks.lock().clear();
128
129        let log_dap_communications =
130            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
131                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
132                .unwrap_or(false);
133
134        let connect = self.transport.lock().connect();
135        let (input, output) = connect.await?;
136
137        let log_handler = if log_dap_communications {
138            Some(self.log_handlers.clone())
139        } else {
140            None
141        };
142
143        let pending_requests = self.pending_requests.clone();
144        let output_log_handler = log_handler.clone();
145        {
146            let mut tasks = self.tasks.lock();
147            tasks.push(cx.background_spawn(async move {
148                match Self::recv_from_server(
149                    output,
150                    message_handler,
151                    pending_requests.clone(),
152                    output_log_handler,
153                )
154                .await
155                {
156                    Ok(()) => {
157                        pending_requests.lock().drain().for_each(|(_, request)| {
158                            request
159                                .send(Err(anyhow!("debugger shutdown unexpectedly")))
160                                .ok();
161                        });
162                    }
163                    Err(e) => {
164                        pending_requests.lock().drain().for_each(|(_, request)| {
165                            request.send(Err(e.cloned())).ok();
166                        });
167                    }
168                }
169            }));
170
171            tasks.push(cx.background_spawn(async move {
172                match Self::send_to_server(input, client_rx, log_handler).await {
173                    Ok(()) => {}
174                    Err(e) => log::error!("Error handling debugger input: {e}"),
175                }
176            }));
177        }
178
179        {
180            let mut lock = self.server_tx.lock().await;
181            *lock = Some(server_tx.clone());
182        }
183
184        Ok(())
185    }
186
187    pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
188        self.transport.lock().tcp_arguments()
189    }
190
191    pub(crate) fn add_pending_request(
192        &self,
193        sequence_id: u64,
194        request: oneshot::Sender<Result<Response>>,
195    ) {
196        let mut pending_requests = self.pending_requests.lock();
197        pending_requests.insert(sequence_id, request);
198    }
199
200    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
201        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
202            server_tx.send(message).await.context("sending message")
203        } else {
204            anyhow::bail!("Server tx already dropped")
205        }
206    }
207
208    async fn handle_adapter_log(
209        stdout: impl AsyncRead + Unpin + Send + 'static,
210        iokind: IoKind,
211        log_handlers: LogHandlers,
212    ) {
213        let mut reader = BufReader::new(stdout);
214        let mut line = String::new();
215
216        loop {
217            line.truncate(0);
218
219            match reader.read_line(&mut line).await {
220                Ok(0) => break,
221                Ok(_) => {}
222                Err(e) => {
223                    log::debug!("handle_adapter_log: {}", e);
224                    break;
225                }
226            }
227
228            for (kind, handler) in log_handlers.lock().iter_mut() {
229                if matches!(kind, LogKind::Adapter) {
230                    handler(iokind, None, line.as_str());
231                }
232            }
233        }
234    }
235
236    fn build_rpc_message(message: String) -> String {
237        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
238    }
239
240    async fn send_to_server<Stdin>(
241        mut server_stdin: Stdin,
242        client_rx: Receiver<Message>,
243        log_handlers: Option<LogHandlers>,
244    ) -> Result<()>
245    where
246        Stdin: AsyncWrite + Unpin + Send + 'static,
247    {
248        let result = loop {
249            match client_rx.recv().await {
250                Ok(message) => {
251                    let command = match &message {
252                        Message::Request(request) => Some(request.command.as_str()),
253                        Message::Response(response) => Some(response.command.as_str()),
254                        _ => None,
255                    };
256
257                    let message = match serde_json::to_string(&message) {
258                        Ok(message) => message,
259                        Err(e) => break Err(e.into()),
260                    };
261
262                    if let Some(log_handlers) = log_handlers.as_ref() {
263                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
264                            if matches!(kind, LogKind::Rpc) {
265                                log_handler(IoKind::StdIn, command, &message);
266                            }
267                        }
268                    }
269
270                    if let Err(e) = server_stdin
271                        .write_all(Self::build_rpc_message(message).as_bytes())
272                        .await
273                    {
274                        break Err(e.into());
275                    }
276
277                    if let Err(e) = server_stdin.flush().await {
278                        break Err(e.into());
279                    }
280                }
281                Err(error) => break Err(error.into()),
282            }
283        };
284
285        log::debug!("Handle adapter input dropped");
286
287        result
288    }
289
290    async fn recv_from_server<Stdout>(
291        server_stdout: Stdout,
292        mut message_handler: DapMessageHandler,
293        pending_requests: Requests,
294        log_handlers: Option<LogHandlers>,
295    ) -> Result<()>
296    where
297        Stdout: AsyncRead + Unpin + Send + 'static,
298    {
299        let mut recv_buffer = String::new();
300        let mut reader = BufReader::new(server_stdout);
301
302        let result = loop {
303            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
304                .await
305            {
306                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
307                ConnectionResult::ConnectionReset => {
308                    log::info!("Debugger closed the connection");
309                    return Ok(());
310                }
311                ConnectionResult::Result(Ok(Message::Response(res))) => {
312                    let tx = pending_requests.lock().remove(&res.request_seq);
313                    if let Some(tx) = tx {
314                        if let Err(e) = tx.send(Self::process_response(res)) {
315                            log::trace!("Did not send response `{:?}` for a cancelled", e);
316                        }
317                    } else {
318                        message_handler(Message::Response(res))
319                    }
320                }
321                ConnectionResult::Result(Ok(message)) => message_handler(message),
322                ConnectionResult::Result(Err(e)) => break Err(e),
323            }
324        };
325
326        log::debug!("Handle adapter output dropped");
327
328        result
329    }
330
331    fn process_response(response: Response) -> Result<Response> {
332        if response.success {
333            Ok(response)
334        } else {
335            if let Some(error_message) = response
336                .body
337                .clone()
338                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
339                .and_then(|response| response.error.map(|msg| msg.format))
340                .or_else(|| response.message.clone())
341            {
342                anyhow::bail!(error_message);
343            };
344
345            anyhow::bail!(
346                "Received error response from adapter. Response: {:?}",
347                response
348            );
349        }
350    }
351
352    async fn receive_server_message<Stdout>(
353        reader: &mut BufReader<Stdout>,
354        buffer: &mut String,
355        log_handlers: Option<&LogHandlers>,
356    ) -> ConnectionResult<Message>
357    where
358        Stdout: AsyncRead + Unpin + Send + 'static,
359    {
360        let mut content_length = None;
361        loop {
362            buffer.truncate(0);
363            match reader.read_line(buffer).await {
364                Ok(0) => return ConnectionResult::ConnectionReset,
365                Ok(_) => {}
366                Err(e) => return ConnectionResult::Result(Err(e.into())),
367            };
368
369            if buffer == "\r\n" {
370                break;
371            }
372
373            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
374                match value.parse().context("invalid content length") {
375                    Ok(length) => content_length = Some(length),
376                    Err(e) => return ConnectionResult::Result(Err(e)),
377                }
378            }
379        }
380
381        let content_length = match content_length.context("missing content length") {
382            Ok(length) => length,
383            Err(e) => return ConnectionResult::Result(Err(e)),
384        };
385
386        let mut content = vec![0; content_length];
387        if let Err(e) = reader
388            .read_exact(&mut content)
389            .await
390            .with_context(|| "reading after a loop")
391        {
392            return ConnectionResult::Result(Err(e));
393        }
394
395        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
396            Ok(str) => str,
397            Err(e) => return ConnectionResult::Result(Err(e)),
398        };
399
400        let message =
401            serde_json::from_str::<Message>(message_str).context("deserializing server message");
402
403        if let Some(log_handlers) = log_handlers {
404            let command = match &message {
405                Ok(Message::Request(request)) => Some(request.command.as_str()),
406                Ok(Message::Response(response)) => Some(response.command.as_str()),
407                _ => None,
408            };
409
410            for (kind, log_handler) in log_handlers.lock().iter_mut() {
411                if matches!(kind, LogKind::Rpc) {
412                    log_handler(IoKind::StdOut, command, message_str);
413                }
414            }
415        }
416
417        ConnectionResult::Result(message)
418    }
419
420    pub fn has_adapter_logs(&self) -> bool {
421        self.transport.lock().has_adapter_logs()
422    }
423
424    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
425    where
426        F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
427    {
428        let mut log_handlers = self.log_handlers.lock();
429        log_handlers.push((kind, Box::new(f)));
430    }
431}
432
433pub struct TcpTransport {
434    executor: BackgroundExecutor,
435    pub port: u16,
436    pub host: Ipv4Addr,
437    pub timeout: u64,
438    process: Arc<Mutex<Option<Child>>>,
439    _stderr_task: Option<Task<()>>,
440    _stdout_task: Option<Task<()>>,
441}
442
443impl TcpTransport {
444    /// Get an open port to use with the tcp client when not supplied by debug config
445    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
446        if let Some(port) = host.port {
447            Ok(port)
448        } else {
449            Self::unused_port(host.host()).await
450        }
451    }
452
453    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
454        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
455            .await?
456            .local_addr()?
457            .port())
458    }
459
460    async fn start(
461        binary: &DebugAdapterBinary,
462        log_handlers: LogHandlers,
463        cx: &mut AsyncApp,
464    ) -> Result<Self> {
465        let connection_args = binary
466            .connection
467            .as_ref()
468            .context("No connection arguments provided")?;
469
470        let host = connection_args.host;
471        let port = connection_args.port;
472
473        let mut process = None;
474        let mut stdout_task = None;
475        let mut stderr_task = None;
476
477        if let Some(command) = &binary.command {
478            let mut command = util::command::new_std_command(&command);
479
480            if let Some(cwd) = &binary.cwd {
481                command.current_dir(cwd);
482            }
483
484            command.args(&binary.arguments);
485            command.envs(&binary.envs);
486
487            let mut p = Child::spawn(command, Stdio::null())
488                .with_context(|| "failed to start debug adapter.")?;
489
490            stdout_task = p.stdout.take().map(|stdout| {
491                cx.background_executor()
492                    .spawn(TransportDelegate::handle_adapter_log(
493                        stdout,
494                        IoKind::StdOut,
495                        log_handlers.clone(),
496                    ))
497            });
498            stderr_task = p.stderr.take().map(|stderr| {
499                cx.background_executor()
500                    .spawn(TransportDelegate::handle_adapter_log(
501                        stderr,
502                        IoKind::StdErr,
503                        log_handlers,
504                    ))
505            });
506            process = Some(p);
507        };
508
509        let timeout = connection_args.timeout.unwrap_or_else(|| {
510            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
511                .unwrap_or(20000u64)
512        });
513
514        log::info!(
515            "Debug adapter has connected to TCP server {}:{}",
516            host,
517            port
518        );
519
520        let this = Self {
521            executor: cx.background_executor().clone(),
522            port,
523            host,
524            process: Arc::new(Mutex::new(process)),
525            timeout,
526            _stdout_task: stdout_task,
527            _stderr_task: stderr_task,
528        };
529
530        Ok(this)
531    }
532}
533
534impl Transport for TcpTransport {
535    fn has_adapter_logs(&self) -> bool {
536        true
537    }
538
539    fn kill(&mut self) {
540        if let Some(process) = &mut *self.process.lock() {
541            process.kill();
542        }
543    }
544
545    fn tcp_arguments(&self) -> Option<TcpArguments> {
546        Some(TcpArguments {
547            host: self.host,
548            port: self.port,
549            timeout: Some(self.timeout),
550        })
551    }
552
553    fn connect(
554        &mut self,
555    ) -> Task<
556        Result<(
557            Box<dyn AsyncWrite + Unpin + Send + 'static>,
558            Box<dyn AsyncRead + Unpin + Send + 'static>,
559        )>,
560    > {
561        let executor = self.executor.clone();
562        let timeout = self.timeout;
563        let address = SocketAddrV4::new(self.host, self.port);
564        let process = self.process.clone();
565        executor.clone().spawn(async move {
566            select! {
567                _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
568                    anyhow::bail!("Connection to TCP DAP timeout {address}");
569                },
570                result = executor.clone().spawn(async move {
571                    loop {
572                        match TcpStream::connect(address).await {
573                            Ok(stream) => {
574                                let (read, write) = stream.split();
575                                return Ok((Box::new(write) as _, Box::new(read) as _))
576                            },
577                            Err(_) => {
578                                let has_process = process.lock().is_some();
579                                if has_process {
580                                    let status = process.lock().as_mut().unwrap().try_status();
581                                    if let Ok(Some(_)) = status {
582                                        let process = process.lock().take().unwrap().into_inner();
583                                        let output = process.output().await?;
584                                        let output = if output.stderr.is_empty() {
585                                            String::from_utf8_lossy(&output.stdout).to_string()
586                                        } else {
587                                            String::from_utf8_lossy(&output.stderr).to_string()
588                                        };
589                                        anyhow::bail!("{output}\nerror: process exited before debugger attached.");
590                                    }
591                                }
592
593                                executor.timer(Duration::from_millis(100)).await;
594                            }
595                        }
596                    }
597                }).fuse() => result
598            }
599        })
600    }
601}
602
603impl Drop for TcpTransport {
604    fn drop(&mut self) {
605        if let Some(mut p) = self.process.lock().take() {
606            p.kill()
607        }
608    }
609}
610
611pub struct StdioTransport {
612    process: Mutex<Option<Child>>,
613    _stderr_task: Option<Task<()>>,
614}
615
616impl StdioTransport {
617    // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
618    async fn start(
619        binary: &DebugAdapterBinary,
620        log_handlers: LogHandlers,
621        cx: &mut AsyncApp,
622    ) -> Result<Self> {
623        let Some(binary_command) = &binary.command else {
624            bail!(
625                "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
626            );
627        };
628        let mut command = util::command::new_std_command(&binary_command);
629
630        if let Some(cwd) = &binary.cwd {
631            command.current_dir(cwd);
632        }
633
634        command.args(&binary.arguments);
635        command.envs(&binary.envs);
636
637        let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
638            format!(
639                "failed to spawn command `{} {}`.",
640                binary_command,
641                binary.arguments.join(" ")
642            )
643        })?;
644
645        let err_task = process.stderr.take().map(|stderr| {
646            cx.background_spawn(TransportDelegate::handle_adapter_log(
647                stderr,
648                IoKind::StdErr,
649                log_handlers,
650            ))
651        });
652
653        let process = Mutex::new(Some(process));
654
655        Ok(Self {
656            process,
657            _stderr_task: err_task,
658        })
659    }
660}
661
662impl Transport for StdioTransport {
663    fn has_adapter_logs(&self) -> bool {
664        false
665    }
666
667    fn kill(&mut self) {
668        if let Some(process) = &mut *self.process.lock() {
669            process.kill();
670        }
671    }
672
673    fn connect(
674        &mut self,
675    ) -> Task<
676        Result<(
677            Box<dyn AsyncWrite + Unpin + Send + 'static>,
678            Box<dyn AsyncRead + Unpin + Send + 'static>,
679        )>,
680    > {
681        let result = util::maybe!({
682            let mut guard = self.process.lock();
683            let process = guard.as_mut().context("oops")?;
684            Ok((
685                Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
686                Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
687            ))
688        });
689        Task::ready(result)
690    }
691
692    fn tcp_arguments(&self) -> Option<TcpArguments> {
693        None
694    }
695}
696
697impl Drop for StdioTransport {
698    fn drop(&mut self) {
699        if let Some(process) = &mut *self.process.lock() {
700            process.kill();
701        }
702    }
703}
704
705#[cfg(any(test, feature = "test-support"))]
706type RequestHandler =
707    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
708
709#[cfg(any(test, feature = "test-support"))]
710type ResponseHandler = Box<dyn Send + Fn(Response)>;
711
712#[cfg(any(test, feature = "test-support"))]
713pub struct FakeTransport {
714    // for sending fake response back from adapter side
715    request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
716    // for reverse request responses
717    response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
718
719    stdin_writer: Option<PipeWriter>,
720    stdout_reader: Option<PipeReader>,
721    message_handler: Option<Task<Result<()>>>,
722}
723
724#[cfg(any(test, feature = "test-support"))]
725impl FakeTransport {
726    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
727    where
728        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
729    {
730        self.request_handlers.lock().insert(
731            R::COMMAND,
732            Box::new(move |seq, args| {
733                let result = handler(seq, serde_json::from_value(args).unwrap());
734                let response = match result {
735                    Ok(response) => Response {
736                        seq: seq + 1,
737                        request_seq: seq,
738                        success: true,
739                        command: R::COMMAND.into(),
740                        body: Some(serde_json::to_value(response).unwrap()),
741                        message: None,
742                    },
743                    Err(response) => Response {
744                        seq: seq + 1,
745                        request_seq: seq,
746                        success: false,
747                        command: R::COMMAND.into(),
748                        body: Some(serde_json::to_value(response).unwrap()),
749                        message: None,
750                    },
751                };
752                response
753            }),
754        );
755    }
756
757    pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
758    where
759        F: 'static + Send + Fn(Response),
760    {
761        self.response_handlers
762            .lock()
763            .insert(R::COMMAND, Box::new(handler));
764    }
765
766    async fn start(cx: &mut AsyncApp) -> Result<Self> {
767        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
768        use serde_json::json;
769
770        let (stdin_writer, stdin_reader) = async_pipe::pipe();
771        let (stdout_writer, stdout_reader) = async_pipe::pipe();
772
773        let mut this = Self {
774            request_handlers: Arc::new(Mutex::new(HashMap::default())),
775            response_handlers: Arc::new(Mutex::new(HashMap::default())),
776            stdin_writer: Some(stdin_writer),
777            stdout_reader: Some(stdout_reader),
778            message_handler: None,
779        };
780
781        let request_handlers = this.request_handlers.clone();
782        let response_handlers = this.response_handlers.clone();
783        let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
784
785        this.message_handler = Some(cx.background_spawn(async move {
786            let mut reader = BufReader::new(stdin_reader);
787            let mut buffer = String::new();
788
789            loop {
790                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
791                    .await
792                {
793                    ConnectionResult::Timeout => {
794                        anyhow::bail!("Timed out when connecting to debugger");
795                    }
796                    ConnectionResult::ConnectionReset => {
797                        log::info!("Debugger closed the connection");
798                        break Ok(());
799                    }
800                    ConnectionResult::Result(Err(e)) => break Err(e),
801                    ConnectionResult::Result(Ok(message)) => {
802                        match message {
803                            Message::Request(request) => {
804                                // redirect reverse requests to stdout writer/reader
805                                if request.command == RunInTerminal::COMMAND
806                                    || request.command == StartDebugging::COMMAND
807                                {
808                                    let message =
809                                        serde_json::to_string(&Message::Request(request)).unwrap();
810
811                                    let mut writer = stdout_writer.lock().await;
812                                    writer
813                                        .write_all(
814                                            TransportDelegate::build_rpc_message(message)
815                                                .as_bytes(),
816                                        )
817                                        .await
818                                        .unwrap();
819                                    writer.flush().await.unwrap();
820                                } else {
821                                    let response = if let Some(handle) =
822                                        request_handlers.lock().get_mut(request.command.as_str())
823                                    {
824                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
825                                    } else {
826                                        panic!("No request handler for {}", request.command);
827                                    };
828                                    let message =
829                                        serde_json::to_string(&Message::Response(response))
830                                            .unwrap();
831
832                                    let mut writer = stdout_writer.lock().await;
833                                    writer
834                                        .write_all(
835                                            TransportDelegate::build_rpc_message(message)
836                                                .as_bytes(),
837                                        )
838                                        .await
839                                        .unwrap();
840                                    writer.flush().await.unwrap();
841                                }
842                            }
843                            Message::Event(event) => {
844                                let message =
845                                    serde_json::to_string(&Message::Event(event)).unwrap();
846
847                                let mut writer = stdout_writer.lock().await;
848                                writer
849                                    .write_all(
850                                        TransportDelegate::build_rpc_message(message).as_bytes(),
851                                    )
852                                    .await
853                                    .unwrap();
854                                writer.flush().await.unwrap();
855                            }
856                            Message::Response(response) => {
857                                if let Some(handle) =
858                                    response_handlers.lock().get(response.command.as_str())
859                                {
860                                    handle(response);
861                                } else {
862                                    log::error!("No response handler for {}", response.command);
863                                }
864                            }
865                        }
866                    }
867                }
868            }
869        }));
870
871        Ok(this)
872    }
873}
874
875#[cfg(any(test, feature = "test-support"))]
876impl Transport for FakeTransport {
877    fn tcp_arguments(&self) -> Option<TcpArguments> {
878        None
879    }
880
881    fn connect(
882        &mut self,
883    ) -> Task<
884        Result<(
885            Box<dyn AsyncWrite + Unpin + Send + 'static>,
886            Box<dyn AsyncRead + Unpin + Send + 'static>,
887        )>,
888    > {
889        let result = util::maybe!({
890            Ok((
891                Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
892                Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
893            ))
894        });
895        Task::ready(result)
896    }
897
898    fn has_adapter_logs(&self) -> bool {
899        false
900    }
901
902    fn kill(&mut self) {
903        self.message_handler.take();
904    }
905
906    #[cfg(any(test, feature = "test-support"))]
907    fn as_fake(&self) -> &FakeTransport {
908        self
909    }
910}
911
912struct Child {
913    process: smol::process::Child,
914}
915
916impl std::ops::Deref for Child {
917    type Target = smol::process::Child;
918
919    fn deref(&self) -> &Self::Target {
920        &self.process
921    }
922}
923
924impl std::ops::DerefMut for Child {
925    fn deref_mut(&mut self) -> &mut Self::Target {
926        &mut self.process
927    }
928}
929
930impl Child {
931    fn into_inner(self) -> smol::process::Child {
932        self.process
933    }
934
935    #[cfg(not(windows))]
936    fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
937        util::set_pre_exec_to_start_new_session(&mut command);
938        let process = smol::process::Command::from(command)
939            .stdin(stdin)
940            .stdout(Stdio::piped())
941            .stderr(Stdio::piped())
942            .spawn()?;
943        Ok(Self { process })
944    }
945
946    #[cfg(windows)]
947    fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
948        // TODO(windows): create a job object and add the child process handle to it,
949        // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
950        let process = smol::process::Command::from(command)
951            .stdin(stdin)
952            .stdout(Stdio::piped())
953            .stderr(Stdio::piped())
954            .spawn()?;
955        Ok(Self { process })
956    }
957
958    #[cfg(not(windows))]
959    fn kill(&mut self) {
960        let pid = self.process.id();
961        unsafe {
962            libc::killpg(pid as i32, libc::SIGKILL);
963        }
964    }
965
966    #[cfg(windows)]
967    fn kill(&mut self) {
968        // TODO(windows): terminate the job object in kill
969        let _ = self.process.kill();
970    }
971}