Don't send notifications or requests until LSP is initialized

Antonio Scandurra created

Change summary

Cargo.lock            |  1 
crates/lsp/Cargo.toml |  1 
crates/lsp/src/lib.rs | 88 +++++++++++++++++++++++++++++++++++---------
3 files changed, 71 insertions(+), 19 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -2958,6 +2958,7 @@ dependencies = [
  "gpui",
  "lsp-types",
  "parking_lot",
+ "postage",
  "serde 1.0.125",
  "serde_json 1.0.64",
  "smol",

crates/lsp/Cargo.toml 🔗

@@ -10,6 +10,7 @@ anyhow = "1.0"
 futures = "0.3"
 lsp-types = "0.91"
 parking_lot = "0.11"
+postage = { version = "0.4.1", features = ["futures-traits"] }
 serde = { version = "1.0", features = ["derive"] }
 serde_json = { version = "1.0", features = ["raw_value"] }
 smol = "1.2"

crates/lsp/src/lib.rs 🔗

@@ -1,6 +1,7 @@
 use anyhow::{anyhow, Context, Result};
 use gpui::{executor, AppContext, Task};
 use parking_lot::Mutex;
+use postage::{barrier, prelude::Stream};
 use serde::{Deserialize, Serialize};
 use serde_json::value::RawValue;
 use smol::{
@@ -29,6 +30,7 @@ pub struct LanguageServer {
     response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
     _input_task: Task<Option<()>>,
     _output_task: Task<Option<()>>,
+    initialized: barrier::Receiver,
 }
 
 type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
@@ -151,32 +153,51 @@ impl LanguageServer {
             .log_err(),
         );
 
+        let (initialized_tx, initialized_rx) = barrier::channel();
         let this = Arc::new(Self {
             response_handlers,
             next_id: Default::default(),
             outbound_tx,
             _input_task,
             _output_task,
+            initialized: initialized_rx,
         });
-        background.spawn(this.clone().init().log_err()).detach();
+
+        background
+            .spawn({
+                let this = this.clone();
+                async move {
+                    this.init().log_err().await;
+                    drop(initialized_tx);
+                }
+            })
+            .detach();
 
         Ok(this)
     }
 
     async fn init(self: Arc<Self>) -> Result<()> {
-        self.request::<lsp_types::request::Initialize>(lsp_types::InitializeParams {
-            process_id: Default::default(),
-            root_path: Default::default(),
-            root_uri: Default::default(),
-            initialization_options: Default::default(),
-            capabilities: Default::default(),
-            trace: Default::default(),
-            workspace_folders: Default::default(),
-            client_info: Default::default(),
-            locale: Default::default(),
-        })
+        let res = self
+            .request_internal::<lsp_types::request::Initialize>(
+                lsp_types::InitializeParams {
+                    process_id: Default::default(),
+                    root_path: Default::default(),
+                    root_uri: Default::default(),
+                    initialization_options: Default::default(),
+                    capabilities: Default::default(),
+                    trace: Default::default(),
+                    workspace_folders: Default::default(),
+                    client_info: Default::default(),
+                    locale: Default::default(),
+                },
+                false,
+            )
+            .await?;
+        self.notify_internal::<lsp_types::notification::Initialized>(
+            lsp_types::InitializedParams {},
+            false,
+        )
         .await?;
-        self.notify::<lsp_types::notification::Initialized>(lsp_types::InitializedParams {})?;
         Ok(())
     }
 
@@ -184,6 +205,17 @@ impl LanguageServer {
         self: &Arc<Self>,
         params: T::Params,
     ) -> impl Future<Output = Result<T::Result>>
+    where
+        T::Result: 'static + Send,
+    {
+        self.request_internal::<T>(params, true)
+    }
+
+    fn request_internal<T: lsp_types::request::Request>(
+        self: &Arc<Self>,
+        params: T::Params,
+        wait_for_initialization: bool,
+    ) -> impl Future<Output = Result<T::Result>>
     where
         T::Result: 'static + Send,
     {
@@ -210,25 +242,43 @@ impl LanguageServer {
             }),
         );
 
-        let outbound_tx = self.outbound_tx.clone();
+        let this = self.clone();
         async move {
-            outbound_tx.send(message).await?;
+            if wait_for_initialization {
+                this.initialized.clone().recv().await;
+            }
+            this.outbound_tx.send(message).await?;
             rx.recv().await?
         }
     }
 
     pub fn notify<T: lsp_types::notification::Notification>(
-        &self,
+        self: &Arc<Self>,
+        params: T::Params,
+    ) -> impl Future<Output = Result<()>> {
+        self.notify_internal::<T>(params, true)
+    }
+
+    fn notify_internal<T: lsp_types::notification::Notification>(
+        self: &Arc<Self>,
         params: T::Params,
-    ) -> Result<()> {
+        wait_for_initialization: bool,
+    ) -> impl Future<Output = Result<()>> {
         let message = serde_json::to_vec(&OutboundNotification {
             jsonrpc: JSON_RPC_VERSION,
             method: T::METHOD,
             params,
         })
         .unwrap();
-        smol::block_on(self.outbound_tx.send(message))?;
-        Ok(())
+
+        let this = self.clone();
+        async move {
+            if wait_for_initialization {
+                this.initialized.clone().recv().await;
+            }
+            this.outbound_tx.send(message).await?;
+            Ok(())
+        }
     }
 }