Fix remaining language server hangs on shutdown

Max Brunsfeld and Nathan Sobo created

* Use fork of async-pipe library that handles closed pipes correctly.
* Clear response handlers map when terminating output task, so as
  to wake any pending request futures.

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

Cargo.lock            |  3 +
crates/lsp/Cargo.toml |  5 +-
crates/lsp/src/lsp.rs | 77 ++++++++++++--------------------------------
3 files changed, 26 insertions(+), 59 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -336,7 +336,7 @@ dependencies = [
 [[package]]
 name = "async-pipe"
 version = "0.1.3"
-source = "git+https://github.com/routerify/async-pipe-rs?rev=feeb77e83142a9ff837d0767652ae41bfc5d8e47#feeb77e83142a9ff837d0767652ae41bfc5d8e47"
+source = "git+https://github.com/zed-industries/async-pipe-rs?rev=82d00a04211cf4e1236029aa03e6b6ce2a74c553#82d00a04211cf4e1236029aa03e6b6ce2a74c553"
 dependencies = [
  "futures",
  "log",
@@ -2827,6 +2827,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "async-pipe",
+ "collections",
  "ctor",
  "env_logger",
  "futures",

crates/lsp/Cargo.toml 🔗

@@ -10,10 +10,11 @@ path = "src/lsp.rs"
 test-support = ["async-pipe"]
 
 [dependencies]
+collections = { path = "../collections" }
 gpui = { path = "../gpui" }
 util = { path = "../util" }
 anyhow = "1.0"
-async-pipe = { git = "https://github.com/routerify/async-pipe-rs", rev = "feeb77e83142a9ff837d0767652ae41bfc5d8e47", optional = true }
+async-pipe = { git = "https://github.com/zed-industries/async-pipe-rs", rev = "82d00a04211cf4e1236029aa03e6b6ce2a74c553", optional = true }
 futures = "0.3"
 log = "0.4"
 lsp-types = "0.91"
@@ -26,7 +27,7 @@ smol = "1.2"
 [dev-dependencies]
 gpui = { path = "../gpui", features = ["test-support"] }
 util = { path = "../util", features = ["test-support"] }
-async-pipe = { git = "https://github.com/routerify/async-pipe-rs", rev = "feeb77e83142a9ff837d0767652ae41bfc5d8e47" }
+async-pipe = { git = "https://github.com/zed-industries/async-pipe-rs", rev = "82d00a04211cf4e1236029aa03e6b6ce2a74c553" }
 ctor = "0.1"
 env_logger = "0.8"
 unindent = "0.1.7"

crates/lsp/src/lsp.rs 🔗

@@ -1,6 +1,6 @@
 use anyhow::{anyhow, Context, Result};
-use futures::channel::oneshot;
-use futures::{io::BufWriter, AsyncRead, AsyncWrite};
+use collections::HashMap;
+use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
 use gpui::{executor, Task};
 use parking_lot::{Mutex, RwLock};
 use postage::{barrier, prelude::Stream, watch};
@@ -12,7 +12,6 @@ use smol::{
     process::Command,
 };
 use std::{
-    collections::HashMap,
     future::Future,
     io::Write,
     str::FromStr,
@@ -129,14 +128,15 @@ impl LanguageServer {
         let mut stdin = BufWriter::new(stdin);
         let mut stdout = BufReader::new(stdout);
         let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
-        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
-        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
+        let notification_handlers =
+            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
+        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
         let input_task = executor.spawn(
             {
                 let notification_handlers = notification_handlers.clone();
                 let response_handlers = response_handlers.clone();
                 async move {
-                    let _clear_response_channels = ClearResponseChannels(response_handlers.clone());
+                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
                     let mut buffer = Vec::new();
                     loop {
                         buffer.clear();
@@ -190,8 +190,10 @@ impl LanguageServer {
             .log_err(),
         );
         let (output_done_tx, output_done_rx) = barrier::channel();
-        let output_task = executor.spawn(
+        let output_task = executor.spawn({
+            let response_handlers = response_handlers.clone();
             async move {
+                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
                 let mut content_len_buffer = Vec::new();
                 while let Ok(message) = outbound_rx.recv().await {
                     content_len_buffer.clear();
@@ -205,8 +207,8 @@ impl LanguageServer {
                 drop(output_done_tx);
                 Ok(())
             }
-            .log_err(),
-        );
+            .log_err()
+        });
 
         let (initialized_tx, initialized_rx) = barrier::channel();
         let (mut capabilities_tx, capabilities_rx) = watch::channel();
@@ -408,9 +410,13 @@ impl LanguageServer {
             params,
         })
         .unwrap();
-        let mut response_handlers = response_handlers.lock();
+
+        let send = outbound_tx
+            .try_send(message)
+            .context("failed to write to language server's stdin");
+
         let (tx, rx) = oneshot::channel();
-        response_handlers.insert(
+        response_handlers.lock().insert(
             id,
             Box::new(move |result| {
                 let response = match result {
@@ -423,9 +429,6 @@ impl LanguageServer {
             }),
         );
 
-        let send = outbound_tx
-            .try_send(message)
-            .context("failed to write to language server's stdin");
         async move {
             send?;
             rx.await?
@@ -581,7 +584,7 @@ impl FakeLanguageServer {
         });
 
         let output_task = cx.background().spawn(async move {
-            let mut stdout = smol::io::BufWriter::new(PipeWriterCloseOnDrop(stdout));
+            let mut stdout = smol::io::BufWriter::new(stdout);
             while let Some(message) = outgoing_rx.next().await {
                 stdout
                     .write_all(CONTENT_LEN_HEADER.as_bytes())
@@ -694,7 +697,7 @@ impl FakeLanguageServer {
         let message_len: usize = std::str::from_utf8(buffer)
             .unwrap()
             .strip_prefix(CONTENT_LEN_HEADER)
-            .unwrap()
+            .ok_or_else(|| anyhow!("invalid content length header"))?
             .trim_end()
             .parse()
             .unwrap();
@@ -704,47 +707,9 @@ impl FakeLanguageServer {
     }
 }
 
-struct PipeWriterCloseOnDrop(async_pipe::PipeWriter);
-
-impl Drop for PipeWriterCloseOnDrop {
-    fn drop(&mut self) {
-        self.0.close().ok();
-    }
-}
-
-impl AsyncWrite for PipeWriterCloseOnDrop {
-    fn poll_write(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> std::task::Poll<std::io::Result<usize>> {
-        let pipe = &mut self.0;
-        smol::pin!(pipe);
-        pipe.poll_write(cx, buf)
-    }
-
-    fn poll_flush(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<std::io::Result<()>> {
-        let pipe = &mut self.0;
-        smol::pin!(pipe);
-        pipe.poll_flush(cx)
-    }
-
-    fn poll_close(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<std::io::Result<()>> {
-        let pipe = &mut self.0;
-        smol::pin!(pipe);
-        pipe.poll_close(cx)
-    }
-}
-
-struct ClearResponseChannels(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
+struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
 
-impl Drop for ClearResponseChannels {
+impl Drop for ClearResponseHandlers {
     fn drop(&mut self) {
         self.0.lock().clear();
     }