Remove unnecessary waiting during completion RPC requests

Max Brunsfeld and Nathan Sobo created

Also, add completion requests to the randomized collaboration integration test,
to demonstrate that this is valid.

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/language/src/language.rs | 28 +++++++---
crates/lsp/src/lsp.rs           |  4 +
crates/project/src/project.rs   | 29 +++++++----
crates/rpc/src/peer.rs          |  2 
crates/server/src/rpc.rs        | 89 ++++++++++++++++++++++------------
5 files changed, 99 insertions(+), 53 deletions(-)

Detailed changes

crates/language/src/language.rs 🔗

@@ -11,6 +11,7 @@ use collections::HashSet;
 use gpui::AppContext;
 use highlight_map::HighlightMap;
 use lazy_static::lazy_static;
+use lsp::FakeLanguageServer;
 use parking_lot::Mutex;
 use postage::prelude::Stream;
 use serde::Deserialize;
@@ -90,6 +91,7 @@ pub struct LanguageServerConfig {
 struct FakeLanguageServerConfig {
     servers_tx: mpsc::UnboundedSender<lsp::FakeLanguageServer>,
     capabilities: lsp::ServerCapabilities,
+    initializer: Option<Box<dyn 'static + Send + Sync + Fn(&mut lsp::FakeLanguageServer)>>,
 }
 
 #[derive(Clone, Debug, Deserialize)]
@@ -235,11 +237,15 @@ impl Language {
         if let Some(config) = &self.config.language_server {
             #[cfg(any(test, feature = "test-support"))]
             if let Some(fake_config) = &config.fake_config {
-                let (server, fake_server) = lsp::LanguageServer::fake_with_capabilities(
+                let (server, mut fake_server) = lsp::LanguageServer::fake_with_capabilities(
                     fake_config.capabilities.clone(),
                     cx.background().clone(),
                 );
 
+                if let Some(initalizer) = &fake_config.initializer {
+                    initalizer(&mut fake_server);
+                }
+
                 let servers_tx = fake_config.servers_tx.clone();
                 let mut initialized = server.capabilities();
                 cx.background()
@@ -381,18 +387,13 @@ impl CompletionLabel {
 #[cfg(any(test, feature = "test-support"))]
 impl LanguageServerConfig {
     pub fn fake() -> (Self, mpsc::UnboundedReceiver<lsp::FakeLanguageServer>) {
-        Self::fake_with_capabilities(Default::default())
-    }
-
-    pub fn fake_with_capabilities(
-        capabilities: lsp::ServerCapabilities,
-    ) -> (Self, mpsc::UnboundedReceiver<lsp::FakeLanguageServer>) {
         let (servers_tx, servers_rx) = mpsc::unbounded();
         (
             Self {
                 fake_config: Some(FakeLanguageServerConfig {
                     servers_tx,
-                    capabilities,
+                    capabilities: Default::default(),
+                    initializer: None,
                 }),
                 disk_based_diagnostics_progress_token: Some("fakeServer/check".to_string()),
                 ..Default::default()
@@ -400,6 +401,17 @@ impl LanguageServerConfig {
             servers_rx,
         )
     }
+
+    pub fn set_fake_capabilities(&mut self, capabilities: lsp::ServerCapabilities) {
+        self.fake_config.as_mut().unwrap().capabilities = capabilities;
+    }
+
+    pub fn set_fake_initializer(
+        &mut self,
+        initializer: impl 'static + Send + Sync + Fn(&mut FakeLanguageServer),
+    ) {
+        self.fake_config.as_mut().unwrap().initializer = Some(Box::new(initializer));
+    }
 }
 
 impl ToLspPosition for PointUtf16 {

crates/lsp/src/lsp.rs 🔗

@@ -420,7 +420,9 @@ impl LanguageServer {
                 anyhow!("tried to send a request to a language server that has been shut down")
             })
             .and_then(|outbound_tx| {
-                outbound_tx.try_send(message)?;
+                outbound_tx
+                    .try_send(message)
+                    .context("failed to write to language server's stdin")?;
                 Ok(())
             });
         async move {

crates/project/src/project.rs 🔗

@@ -2,7 +2,7 @@ pub mod fs;
 mod ignore;
 pub mod worktree;
 
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
 use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
 use clock::ReplicaId;
 use collections::{hash_map, HashMap, HashSet};
@@ -1347,7 +1347,8 @@ impl Project {
                         work_done_progress_params: Default::default(),
                         partial_result_params: Default::default(),
                     })
-                    .await?;
+                    .await
+                    .context("lsp completion request failed")?;
 
                 let completions = if let Some(completions) = completions {
                     match completions {
@@ -1400,13 +1401,16 @@ impl Project {
                 position: Some(language::proto::serialize_anchor(&anchor)),
                 version: (&source_buffer.version()).into(),
             };
-            cx.spawn_weak(|_, mut cx| async move {
+            cx.spawn_weak(|_, cx| async move {
                 let response = rpc.request(message).await?;
-                source_buffer_handle
-                    .update(&mut cx, |buffer, _| {
-                        buffer.wait_for_version(response.version.into())
-                    })
-                    .await;
+
+                if !source_buffer_handle
+                    .read_with(&cx, |buffer, _| buffer.version())
+                    .observed_all(&response.version.into())
+                {
+                    Err(anyhow!("completion response depends on unreceived edits"))?;
+                }
+
                 response
                     .completions
                     .into_iter()
@@ -2352,9 +2356,12 @@ impl Project {
                 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
         })?;
-        buffer
-            .update(&mut cx, |buffer, _| buffer.wait_for_version(version))
-            .await;
+        if !buffer
+            .read_with(&cx, |buffer, _| buffer.version())
+            .observed_all(&version)
+        {
+            Err(anyhow!("completion request depends on unreceived edits"))?;
+        }
         let version = buffer.read_with(&cx, |buffer, _| buffer.version());
         let completions = this
             .update(&mut cx, |this, cx| this.completions(&buffer, position, cx))

crates/rpc/src/peer.rs 🔗

@@ -265,7 +265,7 @@ impl Peer {
                 .await
                 .ok_or_else(|| anyhow!("connection was closed"))?;
             if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
-                Err(anyhow!("request failed").context(error.message.clone()))
+                Err(anyhow!("RPC request failed - {}", error.message))
             } else {
                 T::Response::from_envelope(response)
                     .ok_or_else(|| anyhow!("received response of the wrong type"))

crates/server/src/rpc.rs 🔗

@@ -1093,7 +1093,6 @@ mod tests {
     };
     use ::rpc::Peer;
     use collections::BTreeMap;
-    use futures::channel::mpsc::UnboundedReceiver;
     use gpui::{executor, ModelHandle, TestAppContext};
     use parking_lot::Mutex;
     use postage::{mpsc, watch};
@@ -1127,7 +1126,7 @@ mod tests {
             tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
             LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
         },
-        lsp::{self, FakeLanguageServer},
+        lsp,
         project::{DiagnosticSummary, Project, ProjectPath},
         workspace::{Workspace, WorkspaceParams},
     };
@@ -2218,14 +2217,14 @@ mod tests {
         let fs = Arc::new(FakeFs::new(cx_a.background()));
 
         // Set up a fake language server.
-        let (language_server_config, mut fake_language_servers) =
-            LanguageServerConfig::fake_with_capabilities(lsp::ServerCapabilities {
-                completion_provider: Some(lsp::CompletionOptions {
-                    trigger_characters: Some(vec![".".to_string()]),
-                    ..Default::default()
-                }),
+        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
+        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
+            completion_provider: Some(lsp::CompletionOptions {
+                trigger_characters: Some(vec![".".to_string()]),
                 ..Default::default()
-            });
+            }),
+            ..Default::default()
+        });
         Arc::get_mut(&mut lang_registry)
             .unwrap()
             .add(Arc::new(Language::new(
@@ -3612,7 +3611,19 @@ mod tests {
         let guest_lang_registry = Arc::new(LanguageRegistry::new());
 
         // Set up a fake language server.
-        let (language_server_config, fake_language_servers) = LanguageServerConfig::fake();
+        let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
+        language_server_config.set_fake_initializer(|fake_server| {
+            fake_server.handle_request::<lsp::request::Completion, _>(|_| {
+                Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
+                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
+                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
+                        new_text: "the-new-text".to_string(),
+                    })),
+                    ..Default::default()
+                }]))
+            });
+        });
+
         Arc::get_mut(&mut host_lang_registry)
             .unwrap()
             .add(Arc::new(Language::new(
@@ -3677,7 +3688,6 @@ mod tests {
 
         clients.push(cx.foreground().spawn(host.simulate_host(
             host_project.clone(),
-            fake_language_servers,
             operations.clone(),
             max_operations,
             rng.clone(),
@@ -4002,7 +4012,6 @@ mod tests {
         async fn simulate_host(
             mut self,
             project: ModelHandle<Project>,
-            fake_language_servers: UnboundedReceiver<FakeLanguageServer>,
             operations: Rc<Cell<usize>>,
             max_operations: usize,
             rng: Rc<RefCell<StdRng>>,
@@ -4158,27 +4167,43 @@ mod tests {
                         .clone()
                 };
 
-                if rng.borrow_mut().gen_bool(0.1) {
-                    cx.update(|cx| {
-                        log::info!(
-                            "Guest {}: dropping buffer {:?}",
-                            guest_id,
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        self.buffers.remove(&buffer);
-                        drop(buffer);
-                    });
-                } else {
-                    buffer.update(&mut cx, |buffer, cx| {
-                        log::info!(
-                            "Guest {}: updating buffer {:?}",
-                            guest_id,
-                            buffer.file().unwrap().full_path(cx)
-                        );
-                        buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
-                    });
+                let choice = rng.borrow_mut().gen_range(0..100);
+                match choice {
+                    0..=9 => {
+                        cx.update(|cx| {
+                            log::info!(
+                                "Guest {}: dropping buffer {:?}",
+                                guest_id,
+                                buffer.read(cx).file().unwrap().full_path(cx)
+                            );
+                            self.buffers.remove(&buffer);
+                            drop(buffer);
+                        });
+                    }
+                    10..=19 => {
+                        project
+                            .update(&mut cx, |project, cx| {
+                                log::info!(
+                                    "Guest {}: requesting completions for buffer {:?}",
+                                    guest_id,
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                project.completions(&buffer, 0, cx)
+                            })
+                            .await
+                            .expect("completion request failed");
+                    }
+                    _ => {
+                        buffer.update(&mut cx, |buffer, cx| {
+                            log::info!(
+                                "Guest {}: updating buffer {:?}",
+                                guest_id,
+                                buffer.file().unwrap().full_path(cx)
+                            );
+                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
+                        });
+                    }
                 }
-
                 cx.background().simulate_random_delay().await;
             }