Prevent hangs in lsp requests made while server is shutting down

Max Brunsfeld created

* Avoid postage::oneshot, since receiver is not woken when sender is dropped.
* Clear the response channels when an IO task exits.

Change summary

crates/lsp/src/lsp.rs | 21 +++++++++++++++++----
1 file changed, 17 insertions(+), 4 deletions(-)

Detailed changes

crates/lsp/src/lsp.rs 🔗

@@ -1,8 +1,9 @@
 use anyhow::{anyhow, Context, Result};
+use futures::channel::oneshot;
 use futures::{io::BufWriter, AsyncRead, AsyncWrite};
 use gpui::{executor, Task};
 use parking_lot::{Mutex, RwLock};
-use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
+use postage::{barrier, prelude::Stream, watch};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, value::RawValue, Value};
 use smol::{
@@ -135,6 +136,7 @@ impl LanguageServer {
                 let notification_handlers = notification_handlers.clone();
                 let response_handlers = response_handlers.clone();
                 async move {
+                    let _clear_response_channels = ClearResponseChannels(response_handlers.clone());
                     let mut buffer = Vec::new();
                     loop {
                         buffer.clear();
@@ -323,9 +325,12 @@ impl LanguageServer {
             outbound_tx.close();
             Some(
                 async move {
+                    log::debug!("language server shutdown started");
                     shutdown_request.await?;
+                    response_handlers.lock().clear();
                     exit?;
                     output_done.recv().await;
+                    log::debug!("language server shutdown finished");
                     drop(tasks);
                     Ok(())
                 }
@@ -404,7 +409,7 @@ impl LanguageServer {
         })
         .unwrap();
         let mut response_handlers = response_handlers.lock();
-        let (mut tx, mut rx) = oneshot::channel();
+        let (tx, rx) = oneshot::channel();
         response_handlers.insert(
             id,
             Box::new(move |result| {
@@ -414,7 +419,7 @@ impl LanguageServer {
                     }
                     Err(error) => Err(anyhow!("{}", error.message)),
                 };
-                let _ = tx.try_send(response);
+                let _ = tx.send(response);
             }),
         );
 
@@ -423,7 +428,7 @@ impl LanguageServer {
             .context("failed to write to language server's stdin");
         async move {
             send?;
-            rx.recv().await.unwrap()
+            rx.await?
         }
     }
 
@@ -737,6 +742,14 @@ impl AsyncWrite for PipeWriterCloseOnDrop {
     }
 }
 
+struct ClearResponseChannels(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
+
+impl Drop for ClearResponseChannels {
+    fn drop(&mut self) {
+        self.0.lock().clear();
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;