Do not error on debugger server connection close (#31795)

Kirill Bulatov created

Start to show islands of logs in a successful debugger runs for Rust:

<img width="1728" alt="1"
src="https://github.com/user-attachments/assets/7400201b-b900-4b20-8adf-21850ae59671"
/>

<img width="1728" alt="2"
src="https://github.com/user-attachments/assets/e75cc5f1-1f74-41d6-b7aa-697a4b2f055b"
/>

Release Notes:

- N/A

Change summary

crates/dap/src/transport.rs | 334 +++++++++++++++++++++-----------------
1 file changed, 188 insertions(+), 146 deletions(-)

Detailed changes

crates/dap/src/transport.rs 🔗

@@ -4,7 +4,7 @@ use dap_types::{
     messages::{Message, Response},
 };
 use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
-use gpui::AsyncApp;
+use gpui::{AppContext as _, AsyncApp, Task};
 use settings::Settings as _;
 use smallvec::SmallVec;
 use smol::{
@@ -22,7 +22,7 @@ use std::{
     time::Duration,
 };
 use task::TcpArgumentsTemplate;
-use util::{ResultExt as _, TryFutureExt};
+use util::{ConnectionResult, ResultExt as _};
 
 use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 
@@ -126,7 +126,7 @@ pub(crate) struct TransportDelegate {
     pending_requests: Requests,
     transport: Transport,
     server_tx: Arc<Mutex<Option<Sender<Message>>>>,
-    _tasks: Vec<gpui::Task<Option<()>>>,
+    _tasks: Vec<Task<()>>,
 }
 
 impl TransportDelegate {
@@ -141,7 +141,7 @@ impl TransportDelegate {
             log_handlers: Default::default(),
             current_requests: Default::default(),
             pending_requests: Default::default(),
-            _tasks: Default::default(),
+            _tasks: Vec::new(),
         };
         let messages = this.start_handlers(transport_pipes, cx).await?;
         Ok((messages, this))
@@ -166,45 +166,76 @@ impl TransportDelegate {
             None
         };
 
+        let adapter_log_handler = log_handler.clone();
         cx.update(|cx| {
             if let Some(stdout) = params.stdout.take() {
-                self._tasks.push(
-                    cx.background_executor()
-                        .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()),
-                );
+                self._tasks.push(cx.background_spawn(async move {
+                    match Self::handle_adapter_log(stdout, adapter_log_handler).await {
+                        ConnectionResult::Timeout => {
+                            log::error!("Timed out when handling debugger log");
+                        }
+                        ConnectionResult::ConnectionReset => {
+                            log::info!("Debugger logs connection closed");
+                        }
+                        ConnectionResult::Result(Ok(())) => {}
+                        ConnectionResult::Result(Err(e)) => {
+                            log::error!("Error handling debugger log: {e}");
+                        }
+                    }
+                }));
             }
 
-            self._tasks.push(
-                cx.background_executor().spawn(
-                    Self::handle_output(
-                        params.output,
-                        client_tx,
-                        self.pending_requests.clone(),
-                        log_handler.clone(),
-                    )
-                    .log_err(),
-                ),
-            );
+            let pending_requests = self.pending_requests.clone();
+            let output_log_handler = log_handler.clone();
+            self._tasks.push(cx.background_spawn(async move {
+                match Self::handle_output(
+                    params.output,
+                    client_tx,
+                    pending_requests,
+                    output_log_handler,
+                )
+                .await
+                {
+                    Ok(()) => {}
+                    Err(e) => log::error!("Error handling debugger output: {e}"),
+                }
+            }));
 
             if let Some(stderr) = params.stderr.take() {
-                self._tasks.push(
-                    cx.background_executor()
-                        .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()),
-                );
+                let log_handlers = self.log_handlers.clone();
+                self._tasks.push(cx.background_spawn(async move {
+                    match Self::handle_error(stderr, log_handlers).await {
+                        ConnectionResult::Timeout => {
+                            log::error!("Timed out reading debugger error stream")
+                        }
+                        ConnectionResult::ConnectionReset => {
+                            log::info!("Debugger closed its error stream")
+                        }
+                        ConnectionResult::Result(Ok(())) => {}
+                        ConnectionResult::Result(Err(e)) => {
+                            log::error!("Error handling debugger error: {e}")
+                        }
+                    }
+                }));
             }
 
-            self._tasks.push(
-                cx.background_executor().spawn(
-                    Self::handle_input(
-                        params.input,
-                        client_rx,
-                        self.current_requests.clone(),
-                        self.pending_requests.clone(),
-                        log_handler.clone(),
-                    )
-                    .log_err(),
-                ),
-            );
+            let current_requests = self.current_requests.clone();
+            let pending_requests = self.pending_requests.clone();
+            let log_handler = log_handler.clone();
+            self._tasks.push(cx.background_spawn(async move {
+                match Self::handle_input(
+                    params.input,
+                    client_rx,
+                    current_requests,
+                    pending_requests,
+                    log_handler,
+                )
+                .await
+                {
+                    Ok(()) => {}
+                    Err(e) => log::error!("Error handling debugger input: {e}"),
+                }
+            }));
         })?;
 
         {
@@ -235,7 +266,7 @@ impl TransportDelegate {
     async fn handle_adapter_log<Stdout>(
         stdout: Stdout,
         log_handlers: Option<LogHandlers>,
-    ) -> Result<()>
+    ) -> ConnectionResult<()>
     where
         Stdout: AsyncRead + Unpin + Send + 'static,
     {
@@ -245,13 +276,14 @@ impl TransportDelegate {
         let result = loop {
             line.truncate(0);
 
-            let bytes_read = match reader.read_line(&mut line).await {
-                Ok(bytes_read) => bytes_read,
-                Err(e) => break Err(e.into()),
-            };
-
-            if bytes_read == 0 {
-                anyhow::bail!("Debugger log stream closed");
+            match reader
+                .read_line(&mut line)
+                .await
+                .context("reading adapter log line")
+            {
+                Ok(0) => break ConnectionResult::ConnectionReset,
+                Ok(_) => {}
+                Err(e) => break ConnectionResult::Result(Err(e)),
             }
 
             if let Some(log_handlers) = log_handlers.as_ref() {
@@ -337,35 +369,35 @@ impl TransportDelegate {
         let mut reader = BufReader::new(server_stdout);
 
         let result = loop {
-            let message =
-                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
-                    .await;
-
-            match message {
-                Ok(Message::Response(res)) => {
+            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
+                .await
+            {
+                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
+                ConnectionResult::ConnectionReset => {
+                    log::info!("Debugger closed the connection");
+                    return Ok(());
+                }
+                ConnectionResult::Result(Ok(Message::Response(res))) => {
                     if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
                         if let Err(e) = tx.send(Self::process_response(res)) {
                             log::trace!("Did not send response `{:?}` for a cancelled", e);
                         }
                     } else {
                         client_tx.send(Message::Response(res)).await?;
-                    };
-                }
-                Ok(message) => {
-                    client_tx.send(message).await?;
+                    }
                 }
-                Err(e) => break Err(e),
+                ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
+                ConnectionResult::Result(Err(e)) => break Err(e),
             }
         };
 
         drop(client_tx);
-
         log::debug!("Handle adapter output dropped");
 
         result
     }
 
-    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
+    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
     where
         Stderr: AsyncRead + Unpin + Send + 'static,
     {
@@ -375,8 +407,12 @@ impl TransportDelegate {
         let mut reader = BufReader::new(stderr);
 
         let result = loop {
-            match reader.read_line(&mut buffer).await {
-                Ok(0) => anyhow::bail!("debugger error stream closed"),
+            match reader
+                .read_line(&mut buffer)
+                .await
+                .context("reading error log line")
+            {
+                Ok(0) => break ConnectionResult::ConnectionReset,
                 Ok(_) => {
                     for (kind, log_handler) in log_handlers.lock().iter_mut() {
                         if matches!(kind, LogKind::Adapter) {
@@ -386,7 +422,7 @@ impl TransportDelegate {
 
                     buffer.truncate(0);
                 }
-                Err(error) => break Err(error.into()),
+                Err(error) => break ConnectionResult::Result(Err(error)),
             }
         };
 
@@ -420,7 +456,7 @@ impl TransportDelegate {
         reader: &mut BufReader<Stdout>,
         buffer: &mut String,
         log_handlers: Option<&LogHandlers>,
-    ) -> Result<Message>
+    ) -> ConnectionResult<Message>
     where
         Stdout: AsyncRead + Unpin + Send + 'static,
     {
@@ -428,48 +464,58 @@ impl TransportDelegate {
         loop {
             buffer.truncate(0);
 
-            if reader
+            match reader
                 .read_line(buffer)
                 .await
-                .with_context(|| "reading a message from server")?
-                == 0
+                .with_context(|| "reading a message from server")
             {
-                anyhow::bail!("debugger reader stream closed, last string output: '{buffer}'");
+                Ok(0) => return ConnectionResult::ConnectionReset,
+                Ok(_) => {}
+                Err(e) => return ConnectionResult::Result(Err(e)),
             };
 
             if buffer == "\r\n" {
                 break;
             }
 
-            let parts = buffer.trim().split_once(": ");
-
-            match parts {
-                Some(("Content-Length", value)) => {
-                    content_length = Some(value.parse().context("invalid content length")?);
+            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
+                match value.parse().context("invalid content length") {
+                    Ok(length) => content_length = Some(length),
+                    Err(e) => return ConnectionResult::Result(Err(e)),
                 }
-                _ => {}
             }
         }
 
-        let content_length = content_length.context("missing content length")?;
+        let content_length = match content_length.context("missing content length") {
+            Ok(length) => length,
+            Err(e) => return ConnectionResult::Result(Err(e)),
+        };
 
         let mut content = vec![0; content_length];
-        reader
+        if let Err(e) = reader
             .read_exact(&mut content)
             .await
-            .with_context(|| "reading after a loop")?;
+            .with_context(|| "reading after a loop")
+        {
+            return ConnectionResult::Result(Err(e));
+        }
 
-        let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
+        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
+            Ok(str) => str,
+            Err(e) => return ConnectionResult::Result(Err(e)),
+        };
 
         if let Some(log_handlers) = log_handlers {
             for (kind, log_handler) in log_handlers.lock().iter_mut() {
                 if matches!(kind, LogKind::Rpc) {
-                    log_handler(IoKind::StdOut, &message);
+                    log_handler(IoKind::StdOut, message_str);
                 }
             }
         }
 
-        Ok(serde_json::from_str::<Message>(message)?)
+        ConnectionResult::Result(
+            serde_json::from_str::<Message>(message_str).context("deserializing server message"),
+        )
     }
 
     pub async fn shutdown(&self) -> Result<()> {
@@ -777,73 +823,55 @@ impl FakeTransport {
         let response_handlers = this.response_handlers.clone();
         let stdout_writer = Arc::new(Mutex::new(stdout_writer));
 
-        cx.background_executor()
-            .spawn(async move {
-                let mut reader = BufReader::new(stdin_reader);
-                let mut buffer = String::new();
+        cx.background_spawn(async move {
+            let mut reader = BufReader::new(stdin_reader);
+            let mut buffer = String::new();
 
-                loop {
-                    let message =
-                        TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
-                            .await;
+            loop {
+                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
+                    .await
+                {
+                    ConnectionResult::Timeout => {
+                        anyhow::bail!("Timed out when connecting to debugger");
+                    }
+                    ConnectionResult::ConnectionReset => {
+                        log::info!("Debugger closed the connection");
+                        break Ok(());
+                    }
+                    ConnectionResult::Result(Err(e)) => break Err(e),
+                    ConnectionResult::Result(Ok(message)) => {
+                        match message {
+                            Message::Request(request) => {
+                                // redirect reverse requests to stdout writer/reader
+                                if request.command == RunInTerminal::COMMAND
+                                    || request.command == StartDebugging::COMMAND
+                                {
+                                    let message =
+                                        serde_json::to_string(&Message::Request(request)).unwrap();
 
-                    match message {
-                        Err(error) => {
-                            break anyhow::anyhow!(error);
-                        }
-                        Ok(message) => {
-                            match message {
-                                Message::Request(request) => {
-                                    // redirect reverse requests to stdout writer/reader
-                                    if request.command == RunInTerminal::COMMAND
-                                        || request.command == StartDebugging::COMMAND
+                                    let mut writer = stdout_writer.lock().await;
+                                    writer
+                                        .write_all(
+                                            TransportDelegate::build_rpc_message(message)
+                                                .as_bytes(),
+                                        )
+                                        .await
+                                        .unwrap();
+                                    writer.flush().await.unwrap();
+                                } else {
+                                    let response = if let Some(handle) =
+                                        request_handlers.lock().get_mut(request.command.as_str())
                                     {
-                                        let message =
-                                            serde_json::to_string(&Message::Request(request))
-                                                .unwrap();
-
-                                        let mut writer = stdout_writer.lock().await;
-                                        writer
-                                            .write_all(
-                                                TransportDelegate::build_rpc_message(message)
-                                                    .as_bytes(),
-                                            )
-                                            .await
-                                            .unwrap();
-                                        writer.flush().await.unwrap();
+                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
                                     } else {
-                                        let response = if let Some(handle) = request_handlers
-                                            .lock()
-                                            .get_mut(request.command.as_str())
-                                        {
-                                            handle(
-                                                request.seq,
-                                                request.arguments.unwrap_or(json!({})),
-                                            )
-                                        } else {
-                                            panic!("No request handler for {}", request.command);
-                                        };
-                                        let message =
-                                            serde_json::to_string(&Message::Response(response))
-                                                .unwrap();
-
-                                        let mut writer = stdout_writer.lock().await;
-
-                                        writer
-                                            .write_all(
-                                                TransportDelegate::build_rpc_message(message)
-                                                    .as_bytes(),
-                                            )
-                                            .await
-                                            .unwrap();
-                                        writer.flush().await.unwrap();
-                                    }
-                                }
-                                Message::Event(event) => {
+                                        panic!("No request handler for {}", request.command);
+                                    };
                                     let message =
-                                        serde_json::to_string(&Message::Event(event)).unwrap();
+                                        serde_json::to_string(&Message::Response(response))
+                                            .unwrap();
 
                                     let mut writer = stdout_writer.lock().await;
+
                                     writer
                                         .write_all(
                                             TransportDelegate::build_rpc_message(message)
@@ -853,21 +881,35 @@ impl FakeTransport {
                                         .unwrap();
                                     writer.flush().await.unwrap();
                                 }
-                                Message::Response(response) => {
-                                    if let Some(handle) =
-                                        response_handlers.lock().get(response.command.as_str())
-                                    {
-                                        handle(response);
-                                    } else {
-                                        log::error!("No response handler for {}", response.command);
-                                    }
+                            }
+                            Message::Event(event) => {
+                                let message =
+                                    serde_json::to_string(&Message::Event(event)).unwrap();
+
+                                let mut writer = stdout_writer.lock().await;
+                                writer
+                                    .write_all(
+                                        TransportDelegate::build_rpc_message(message).as_bytes(),
+                                    )
+                                    .await
+                                    .unwrap();
+                                writer.flush().await.unwrap();
+                            }
+                            Message::Response(response) => {
+                                if let Some(handle) =
+                                    response_handlers.lock().get(response.command.as_str())
+                                {
+                                    handle(response);
+                                } else {
+                                    log::error!("No response handler for {}", response.command);
                                 }
                             }
                         }
                     }
                 }
-            })
-            .detach();
+            }
+        })
+        .detach();
 
         Ok((
             TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),