Split language server initialization from construction

Antonio Scandurra created

This gives clients a chance to register to notifications.

Change summary

crates/language/src/language.rs |  43 +++---
crates/lsp/src/lsp.rs           | 217 ++++++++++++++------------------
crates/project/src/project.rs   | 226 ++++++++++++++++++----------------
crates/server/src/rpc.rs        |  78 ++++++-----
4 files changed, 277 insertions(+), 287 deletions(-)

Detailed changes

crates/language/src/language.rs 🔗

@@ -245,7 +245,7 @@ impl LanguageRegistry {
         root_path: Arc<Path>,
         http_client: Arc<dyn HttpClient>,
         cx: &mut MutableAppContext,
-    ) -> Option<Task<Result<Arc<lsp::LanguageServer>>>> {
+    ) -> Option<Task<Result<lsp::LanguageServer>>> {
         #[cfg(any(test, feature = "test-support"))]
         if language
             .config
@@ -264,23 +264,26 @@ impl LanguageRegistry {
                     .fake_config
                     .as_ref()
                     .unwrap();
-                let (server, mut fake_server) = cx
-                    .update(|cx| {
-                        lsp::LanguageServer::fake_with_capabilities(
-                            fake_config.capabilities.clone(),
-                            cx,
-                        )
-                    })
-                    .await;
-                if let Some(initalizer) = &fake_config.initializer {
-                    initalizer(&mut fake_server);
+                let (server, mut fake_server) = cx.update(|cx| {
+                    lsp::LanguageServer::fake_with_capabilities(
+                        fake_config.capabilities.clone(),
+                        cx,
+                    )
+                });
+                if let Some(initializer) = &fake_config.initializer {
+                    initializer(&mut fake_server);
                 }
-                fake_config
-                    .servers_tx
-                    .clone()
-                    .unbounded_send(fake_server)
-                    .ok();
-                Ok(server.clone())
+
+                let servers_tx = fake_config.servers_tx.clone();
+                cx.background()
+                    .spawn(async move {
+                        fake_server
+                            .receive_notification::<lsp::notification::Initialized>()
+                            .await;
+                        servers_tx.unbounded_send(fake_server).ok();
+                    })
+                    .detach();
+                Ok(server)
             }));
         }
 
@@ -316,15 +319,13 @@ impl LanguageRegistry {
 
             let server_binary_path = server_binary_path.await?;
             let server_args = adapter.server_args();
-            let server = lsp::LanguageServer::new(
+            lsp::LanguageServer::new(
                 &server_binary_path,
                 server_args,
-                adapter.initialization_options(),
                 &root_path,
+                adapter.initialization_options(),
                 background,
             )
-            .await?;
-            Ok(server)
         }))
     }
 

crates/lsp/src/lsp.rs 🔗

@@ -14,6 +14,7 @@ use smol::{
 use std::{
     future::Future,
     io::Write,
+    path::PathBuf,
     str::FromStr,
     sync::{
         atomic::{AtomicUsize, Ordering::SeqCst},
@@ -40,6 +41,8 @@ pub struct LanguageServer {
     executor: Arc<executor::Background>,
     io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
     output_done_rx: Mutex<Option<barrier::Receiver>>,
+    root_path: PathBuf,
+    options: Option<Value>,
 }
 
 pub struct Subscription {
@@ -99,13 +102,13 @@ struct Error {
 }
 
 impl LanguageServer {
-    pub async fn new(
+    pub fn new(
         binary_path: &Path,
         args: &[&str],
-        options: Option<Value>,
         root_path: &Path,
+        options: Option<Value>,
         background: Arc<executor::Background>,
-    ) -> Result<Arc<Self>> {
+    ) -> Result<Self> {
         let mut server = Command::new(binary_path)
             .current_dir(root_path)
             .args(args)
@@ -115,16 +118,18 @@ impl LanguageServer {
             .spawn()?;
         let stdin = server.stdin.take().unwrap();
         let stdout = server.stdout.take().unwrap();
-        Self::new_internal(stdin, stdout, root_path, options, background).await
+        Ok(Self::new_internal(
+            stdin, stdout, root_path, options, background,
+        ))
     }
 
-    async fn new_internal<Stdin, Stdout>(
+    fn new_internal<Stdin, Stdout>(
         stdin: Stdin,
         stdout: Stdout,
         root_path: &Path,
         options: Option<Value>,
         executor: Arc<executor::Background>,
-    ) -> Result<Arc<Self>>
+    ) -> Self
     where
         Stdin: AsyncWrite + Unpin + Send + 'static,
         Stdout: AsyncRead + Unpin + Send + 'static,
@@ -214,7 +219,7 @@ impl LanguageServer {
             .log_err()
         });
 
-        let mut this = Arc::new(Self {
+        Self {
             notification_handlers,
             response_handlers,
             capabilities: Default::default(),
@@ -223,80 +228,73 @@ impl LanguageServer {
             executor: executor.clone(),
             io_tasks: Mutex::new(Some((input_task, output_task))),
             output_done_rx: Mutex::new(Some(output_done_rx)),
-        });
+            root_path: root_path.to_path_buf(),
+            options,
+        }
+    }
 
-        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
-
-        executor
-            .spawn(async move {
-                #[allow(deprecated)]
-                let params = InitializeParams {
-                    process_id: Default::default(),
-                    root_path: Default::default(),
-                    root_uri: Some(root_uri),
-                    initialization_options: options,
-                    capabilities: ClientCapabilities {
-                        text_document: Some(TextDocumentClientCapabilities {
-                            definition: Some(GotoCapability {
-                                link_support: Some(true),
-                                ..Default::default()
-                            }),
-                            code_action: Some(CodeActionClientCapabilities {
-                                code_action_literal_support: Some(CodeActionLiteralSupport {
-                                    code_action_kind: CodeActionKindLiteralSupport {
-                                        value_set: vec![
-                                            CodeActionKind::REFACTOR.as_str().into(),
-                                            CodeActionKind::QUICKFIX.as_str().into(),
-                                        ],
-                                    },
-                                }),
-                                data_support: Some(true),
-                                resolve_support: Some(CodeActionCapabilityResolveSupport {
-                                    properties: vec!["edit".to_string()],
-                                }),
-                                ..Default::default()
-                            }),
-                            completion: Some(CompletionClientCapabilities {
-                                completion_item: Some(CompletionItemCapability {
-                                    snippet_support: Some(true),
-                                    resolve_support: Some(CompletionItemCapabilityResolveSupport {
-                                        properties: vec!["additionalTextEdits".to_string()],
-                                    }),
-                                    ..Default::default()
-                                }),
-                                ..Default::default()
-                            }),
-                            ..Default::default()
+    pub async fn initialize(mut self) -> Result<Arc<Self>> {
+        let options = self.options.take();
+        let mut this = Arc::new(self);
+        let root_uri = Url::from_file_path(&this.root_path).unwrap();
+        #[allow(deprecated)]
+        let params = InitializeParams {
+            process_id: Default::default(),
+            root_path: Default::default(),
+            root_uri: Some(root_uri),
+            initialization_options: options,
+            capabilities: ClientCapabilities {
+                text_document: Some(TextDocumentClientCapabilities {
+                    definition: Some(GotoCapability {
+                        link_support: Some(true),
+                        ..Default::default()
+                    }),
+                    code_action: Some(CodeActionClientCapabilities {
+                        code_action_literal_support: Some(CodeActionLiteralSupport {
+                            code_action_kind: CodeActionKindLiteralSupport {
+                                value_set: vec![
+                                    CodeActionKind::REFACTOR.as_str().into(),
+                                    CodeActionKind::QUICKFIX.as_str().into(),
+                                ],
+                            },
+                        }),
+                        data_support: Some(true),
+                        resolve_support: Some(CodeActionCapabilityResolveSupport {
+                            properties: vec!["edit".to_string()],
                         }),
-                        experimental: Some(json!({
-                            "serverStatusNotification": true,
-                        })),
-                        window: Some(WindowClientCapabilities {
-                            work_done_progress: Some(true),
+                        ..Default::default()
+                    }),
+                    completion: Some(CompletionClientCapabilities {
+                        completion_item: Some(CompletionItemCapability {
+                            snippet_support: Some(true),
+                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
+                                properties: vec!["additionalTextEdits".to_string()],
+                            }),
                             ..Default::default()
                         }),
                         ..Default::default()
-                    },
-                    trace: Default::default(),
-                    workspace_folders: Default::default(),
-                    client_info: Default::default(),
-                    locale: Default::default(),
-                };
-
-                let request = Self::request_internal::<request::Initialize>(
-                    &this.next_id,
-                    &this.response_handlers,
-                    &this.outbound_tx,
-                    params,
-                );
-                Arc::get_mut(&mut this).unwrap().capabilities = request.await?.capabilities;
-                Self::notify_internal::<notification::Initialized>(
-                    &this.outbound_tx,
-                    InitializedParams {},
-                )?;
-                Ok(this)
-            })
-            .await
+                    }),
+                    ..Default::default()
+                }),
+                experimental: Some(json!({
+                    "serverStatusNotification": true,
+                })),
+                window: Some(WindowClientCapabilities {
+                    work_done_progress: Some(true),
+                    ..Default::default()
+                }),
+                ..Default::default()
+            },
+            trace: Default::default(),
+            workspace_folders: Default::default(),
+            client_info: Default::default(),
+            locale: Default::default(),
+        };
+
+        let response = this.request::<request::Initialize>(params).await?;
+        Arc::get_mut(&mut this).unwrap().capabilities = response.capabilities;
+        this.notify::<notification::Initialized>(InitializedParams {})?;
+        Ok(this)
     }
 
     pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
@@ -331,7 +329,7 @@ impl LanguageServer {
         }
     }
 
-    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
+    pub fn on_notification<T, F>(&mut self, mut f: F) -> Subscription
     where
         T: notification::Notification,
         F: 'static + Send + Sync + FnMut(T::Params),
@@ -357,7 +355,7 @@ impl LanguageServer {
         }
     }
 
-    pub fn capabilities(&self) -> &ServerCapabilities {
+    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
         &self.capabilities
     }
 
@@ -368,16 +366,12 @@ impl LanguageServer {
     where
         T::Result: 'static + Send,
     {
-        let this = self.clone();
-        async move {
-            Self::request_internal::<T>(
-                &this.next_id,
-                &this.response_handlers,
-                &this.outbound_tx,
-                params,
-            )
-            .await
-        }
+        Self::request_internal::<T>(
+            &self.next_id,
+            &self.response_handlers,
+            &self.outbound_tx,
+            params,
+        )
     }
 
     fn request_internal<T: request::Request>(
@@ -492,16 +486,14 @@ impl LanguageServer {
         }
     }
 
-    pub fn fake(
-        cx: &mut gpui::MutableAppContext,
-    ) -> impl Future<Output = (Arc<Self>, FakeLanguageServer)> {
+    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
         Self::fake_with_capabilities(Self::full_capabilities(), cx)
     }
 
     pub fn fake_with_capabilities(
         capabilities: ServerCapabilities,
         cx: &mut gpui::MutableAppContext,
-    ) -> impl Future<Output = (Arc<Self>, FakeLanguageServer)> {
+    ) -> (Self, FakeLanguageServer) {
         let (stdin_writer, stdin_reader) = async_pipe::pipe();
         let (stdout_writer, stdout_reader) = async_pipe::pipe();
 
@@ -515,14 +507,9 @@ impl LanguageServer {
         });
 
         let executor = cx.background().clone();
-        async move {
-            let server =
-                Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), None, executor)
-                    .await
-                    .unwrap();
-
-            (server, fake)
-        }
+        let server =
+            Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), None, executor);
+        (server, fake)
     }
 }
 
@@ -547,6 +534,7 @@ impl FakeLanguageServer {
                 let mut stdin = smol::io::BufReader::new(stdin);
                 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
                     cx.background().simulate_random_delay().await;
+
                     if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
                         assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
 
@@ -602,7 +590,7 @@ impl FakeLanguageServer {
         }
     }
 
-    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
+    pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
         let message = serde_json::to_vec(&Notification {
             jsonrpc: JSON_RPC_VERSION,
             method: T::METHOD,
@@ -667,16 +655,14 @@ impl FakeLanguageServer {
         self.notify::<notification::Progress>(ProgressParams {
             token: NumberOrString::String(token.into()),
             value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
-        })
-        .await;
+        });
     }
 
     pub async fn end_progress(&mut self, token: impl Into<String>) {
         self.notify::<notification::Progress>(ProgressParams {
             token: NumberOrString::String(token.into()),
             value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
-        })
-        .await;
+        });
     }
 
     async fn receive(
@@ -721,7 +707,7 @@ mod tests {
 
     #[gpui::test]
     async fn test_fake(cx: &mut TestAppContext) {
-        let (server, mut fake) = cx.update(LanguageServer::fake).await;
+        let (mut server, mut fake) = cx.update(LanguageServer::fake);
 
         let (message_tx, message_rx) = channel::unbounded();
         let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
@@ -736,6 +722,7 @@ mod tests {
             })
             .detach();
 
+        let server = server.initialize().await.unwrap();
         server
             .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
                 text_document: TextDocumentItem::new(
@@ -758,14 +745,12 @@ mod tests {
         fake.notify::<notification::ShowMessage>(ShowMessageParams {
             typ: MessageType::ERROR,
             message: "ok".to_string(),
-        })
-        .await;
+        });
         fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
             uri: Url::from_str("file://b/c").unwrap(),
             version: Some(5),
             diagnostics: vec![],
-        })
-        .await;
+        });
         assert_eq!(message_rx.recv().await.unwrap().message, "ok");
         assert_eq!(
             diagnostics_rx.recv().await.unwrap().uri.as_str(),
@@ -777,16 +762,4 @@ mod tests {
         drop(server);
         fake.receive_notification::<notification::Exit>().await;
     }
-
-    pub enum ServerStatusNotification {}
-
-    impl notification::Notification for ServerStatusNotification {
-        type Params = ServerStatusParams;
-        const METHOD: &'static str = "experimental/serverStatus";
-    }
-
-    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
-    pub struct ServerStatusParams {
-        pub quiescent: bool,
-    }
 }

crates/project/src/project.rs 🔗

@@ -1173,67 +1173,8 @@ impl Project {
                 );
                 let rpc = self.client.clone();
                 cx.spawn_weak(|this, mut cx| async move {
-                    let language_server = language_server?.await.log_err()?;
+                    let mut language_server = language_server?.await.log_err()?;
                     let this = this.upgrade(&cx)?;
-                    this.update(&mut cx, |this, cx| {
-                        this.language_servers
-                            .insert(key.clone(), language_server.clone());
-
-                        // Tell the language server about every open buffer in the worktree that matches the language.
-                        for buffer in this.opened_buffers.values() {
-                            if let Some(buffer_handle) = buffer.upgrade(cx) {
-                                let buffer = buffer_handle.read(cx);
-                                let file = if let Some(file) = File::from_dyn(buffer.file()) {
-                                    file
-                                } else {
-                                    continue;
-                                };
-                                let language = if let Some(language) = buffer.language() {
-                                    language
-                                } else {
-                                    continue;
-                                };
-                                if (file.worktree.read(cx).id(), language.name()) != key {
-                                    continue;
-                                }
-
-                                let file = file.as_local()?;
-                                let versions = this
-                                    .buffer_snapshots
-                                    .entry(buffer.remote_id())
-                                    .or_insert_with(|| vec![(0, buffer.text_snapshot())]);
-                                let (version, initial_snapshot) = versions.last().unwrap();
-                                let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
-                                language_server
-                                    .notify::<lsp::notification::DidOpenTextDocument>(
-                                        lsp::DidOpenTextDocumentParams {
-                                            text_document: lsp::TextDocumentItem::new(
-                                                uri,
-                                                Default::default(),
-                                                *version,
-                                                initial_snapshot.text(),
-                                            ),
-                                        },
-                                    )
-                                    .log_err()?;
-                                buffer_handle.update(cx, |buffer, cx| {
-                                    buffer.set_completion_triggers(
-                                        language_server
-                                            .capabilities()
-                                            .completion_provider
-                                            .as_ref()
-                                            .and_then(|provider| {
-                                                provider.trigger_characters.clone()
-                                            })
-                                            .unwrap_or(Vec::new()),
-                                        cx,
-                                    )
-                                });
-                            }
-                        }
-
-                        Some(())
-                    });
 
                     let disk_based_sources = language
                         .disk_based_diagnostic_sources()
@@ -1305,45 +1246,112 @@ impl Project {
                         .detach();
 
                     // Process all the LSP events.
-                    let this = this.downgrade();
-                    cx.spawn(|mut cx| async move {
-                        while let Ok(message) = diagnostics_rx.recv().await {
-                            let this = this.upgrade(&cx)?;
-                            match message {
-                                LspEvent::DiagnosticsStart => {
-                                    this.update(&mut cx, |this, cx| {
-                                        this.disk_based_diagnostics_started(cx);
-                                        if let Some(project_id) = this.remote_id() {
-                                            rpc.send(proto::DiskBasedDiagnosticsUpdating {
-                                                project_id,
-                                            })
+                    cx.spawn(|mut cx| {
+                        let this = this.downgrade();
+                        async move {
+                            while let Ok(message) = diagnostics_rx.recv().await {
+                                let this = this.upgrade(&cx)?;
+                                match message {
+                                    LspEvent::DiagnosticsStart => {
+                                        this.update(&mut cx, |this, cx| {
+                                            this.disk_based_diagnostics_started(cx);
+                                            if let Some(project_id) = this.remote_id() {
+                                                rpc.send(proto::DiskBasedDiagnosticsUpdating {
+                                                    project_id,
+                                                })
+                                                .log_err();
+                                            }
+                                        });
+                                    }
+                                    LspEvent::DiagnosticsUpdate(mut params) => {
+                                        language.process_diagnostics(&mut params);
+                                        this.update(&mut cx, |this, cx| {
+                                            this.update_diagnostics(
+                                                params,
+                                                &disk_based_sources,
+                                                cx,
+                                            )
                                             .log_err();
-                                        }
-                                    });
+                                        });
+                                    }
+                                    LspEvent::DiagnosticsFinish => {
+                                        this.update(&mut cx, |this, cx| {
+                                            this.disk_based_diagnostics_finished(cx);
+                                            if let Some(project_id) = this.remote_id() {
+                                                rpc.send(proto::DiskBasedDiagnosticsUpdated {
+                                                    project_id,
+                                                })
+                                                .log_err();
+                                            }
+                                        });
+                                    }
                                 }
-                                LspEvent::DiagnosticsUpdate(mut params) => {
-                                    language.process_diagnostics(&mut params);
-                                    this.update(&mut cx, |this, cx| {
-                                        this.update_diagnostics(params, &disk_based_sources, cx)
-                                            .log_err();
-                                    });
+                            }
+                            Some(())
+                        }
+                    })
+                    .detach();
+
+                    let language_server = language_server.initialize().await.log_err()?;
+                    this.update(&mut cx, |this, cx| {
+                        this.language_servers
+                            .insert(key.clone(), language_server.clone());
+
+                        // Tell the language server about every open buffer in the worktree that matches the language.
+                        for buffer in this.opened_buffers.values() {
+                            if let Some(buffer_handle) = buffer.upgrade(cx) {
+                                let buffer = buffer_handle.read(cx);
+                                let file = if let Some(file) = File::from_dyn(buffer.file()) {
+                                    file
+                                } else {
+                                    continue;
+                                };
+                                let language = if let Some(language) = buffer.language() {
+                                    language
+                                } else {
+                                    continue;
+                                };
+                                if (file.worktree.read(cx).id(), language.name()) != key {
+                                    continue;
                                 }
-                                LspEvent::DiagnosticsFinish => {
-                                    this.update(&mut cx, |this, cx| {
-                                        this.disk_based_diagnostics_finished(cx);
-                                        if let Some(project_id) = this.remote_id() {
-                                            rpc.send(proto::DiskBasedDiagnosticsUpdated {
-                                                project_id,
+
+                                let file = file.as_local()?;
+                                let versions = this
+                                    .buffer_snapshots
+                                    .entry(buffer.remote_id())
+                                    .or_insert_with(|| vec![(0, buffer.text_snapshot())]);
+                                let (version, initial_snapshot) = versions.last().unwrap();
+                                let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
+                                language_server
+                                    .notify::<lsp::notification::DidOpenTextDocument>(
+                                        lsp::DidOpenTextDocumentParams {
+                                            text_document: lsp::TextDocumentItem::new(
+                                                uri,
+                                                Default::default(),
+                                                *version,
+                                                initial_snapshot.text(),
+                                            ),
+                                        },
+                                    )
+                                    .log_err()?;
+                                buffer_handle.update(cx, |buffer, cx| {
+                                    buffer.set_completion_triggers(
+                                        language_server
+                                            .capabilities()
+                                            .completion_provider
+                                            .as_ref()
+                                            .and_then(|provider| {
+                                                provider.trigger_characters.clone()
                                             })
-                                            .log_err();
-                                        }
-                                    });
-                                }
+                                            .unwrap_or(Vec::new()),
+                                        cx,
+                                    )
+                                });
                             }
                         }
+
                         Some(())
-                    })
-                    .detach();
+                    });
 
                     Some(language_server)
                 })
@@ -2654,7 +2662,7 @@ impl Project {
             {
                 let lsp_params = request.to_lsp(&file.abs_path(cx), cx);
                 return cx.spawn(|this, cx| async move {
-                    if !request.check_capabilities(language_server.capabilities()) {
+                    if !request.check_capabilities(&language_server.capabilities()) {
                         return Ok(Default::default());
                     }
 
@@ -4516,8 +4524,8 @@ mod tests {
         fake_server.end_progress(&progress_token).await;
         fake_server.start_progress(&progress_token).await;
 
-        fake_server
-            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
+        fake_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
                 uri: Url::from_file_path("/dir/a.rs").unwrap(),
                 version: None,
                 diagnostics: vec![lsp::Diagnostic {
@@ -4526,8 +4534,8 @@ mod tests {
                     message: "undefined variable 'A'".to_string(),
                     ..Default::default()
                 }],
-            })
-            .await;
+            },
+        );
         assert_eq!(
             events.next().await.unwrap(),
             Event::DiagnosticsUpdated((worktree_id, Path::new("a.rs")).into())
@@ -4632,8 +4640,8 @@ mod tests {
         );
 
         // Report some diagnostics for the initial version of the buffer
-        fake_server
-            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
+        fake_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
                 uri: lsp::Url::from_file_path("/dir/a.rs").unwrap(),
                 version: Some(open_notification.text_document.version),
                 diagnostics: vec![
@@ -4659,8 +4667,8 @@ mod tests {
                         ..Default::default()
                     },
                 ],
-            })
-            .await;
+            },
+        );
 
         // The diagnostics have moved down since they were created.
         buffer.next_notification(cx).await;
@@ -4718,8 +4726,8 @@ mod tests {
         });
 
         // Ensure overlapping diagnostics are highlighted correctly.
-        fake_server
-            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
+        fake_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
                 uri: lsp::Url::from_file_path("/dir/a.rs").unwrap(),
                 version: Some(open_notification.text_document.version),
                 diagnostics: vec![
@@ -4738,8 +4746,8 @@ mod tests {
                         ..Default::default()
                     },
                 ],
-            })
-            .await;
+            },
+        );
 
         buffer.next_notification(cx).await;
         buffer.read_with(cx, |buffer, _| {
@@ -4805,8 +4813,8 @@ mod tests {
         );
 
         // Handle out-of-order diagnostics
-        fake_server
-            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
+        fake_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
                 uri: lsp::Url::from_file_path("/dir/a.rs").unwrap(),
                 version: Some(open_notification.text_document.version),
                 diagnostics: vec![
@@ -4825,8 +4833,8 @@ mod tests {
                         ..Default::default()
                     },
                 ],
-            })
-            .await;
+            },
+        );
 
         buffer.next_notification(cx).await;
         buffer.read_with(cx, |buffer, _| {

crates/server/src/rpc.rs 🔗

@@ -1948,8 +1948,8 @@ mod tests {
         fake_language_server
             .receive_notification::<lsp::notification::DidOpenTextDocument>()
             .await;
-        fake_language_server
-            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
+        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
                 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
                 version: None,
                 diagnostics: vec![lsp::Diagnostic {
@@ -1958,8 +1958,8 @@ mod tests {
                     message: "message 1".to_string(),
                     ..Default::default()
                 }],
-            })
-            .await;
+            },
+        );
 
         // Wait for server to see the diagnostics update.
         server
@@ -2008,8 +2008,8 @@ mod tests {
         });
 
         // Simulate a language server reporting more errors for a file.
-        fake_language_server
-            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
+        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
                 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
                 version: None,
                 diagnostics: vec![
@@ -2029,8 +2029,8 @@ mod tests {
                         ..Default::default()
                     },
                 ],
-            })
-            .await;
+            },
+        );
 
         // Client b gets the updated summaries
         project_b
@@ -2374,10 +2374,6 @@ mod tests {
             .await
             .unwrap();
 
-        let format = project_b.update(cx_b, |project, cx| {
-            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
-        });
-
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
             Some(vec![
@@ -2392,7 +2388,12 @@ mod tests {
             ])
         });
 
-        format.await.unwrap();
+        project_b
+            .update(cx_b, |project, cx| {
+                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
+            })
+            .await
+            .unwrap();
         assert_eq!(
             buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
             "let honey = two"
@@ -2482,8 +2483,6 @@ mod tests {
             .unwrap();
 
         // Request the definition of a symbol as the guest.
-        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
-
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
             Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
@@ -2492,7 +2491,10 @@ mod tests {
             )))
         });
 
-        let definitions_1 = definitions_1.await.unwrap();
+        let definitions_1 = project_b
+            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
+            .await
+            .unwrap();
         cx_b.read(|cx| {
             assert_eq!(definitions_1.len(), 1);
             assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
@@ -2509,7 +2511,6 @@ mod tests {
 
         // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
         // the previous call to `definition`.
-        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
         fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
             Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
                 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
@@ -2517,7 +2518,10 @@ mod tests {
             )))
         });
 
-        let definitions_2 = definitions_2.await.unwrap();
+        let definitions_2 = project_b
+            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
+            .await
+            .unwrap();
         cx_b.read(|cx| {
             assert_eq!(definitions_2.len(), 1);
             assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
@@ -2618,8 +2622,6 @@ mod tests {
             .unwrap();
 
         // Request references to a symbol as the guest.
-        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
-
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
             assert_eq!(
@@ -2642,7 +2644,10 @@ mod tests {
             ])
         });
 
-        let references = references.await.unwrap();
+        let references = project_b
+            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
+            .await
+            .unwrap();
         cx_b.read(|cx| {
             assert_eq!(references.len(), 3);
             assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
@@ -2846,8 +2851,6 @@ mod tests {
             .unwrap();
 
         // Request document highlights as the guest.
-        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
-
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
             |params, _| {
@@ -2889,7 +2892,10 @@ mod tests {
             },
         );
 
-        let highlights = highlights.await.unwrap();
+        let highlights = project_b
+            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
+            .await
+            .unwrap();
         buffer_b.read_with(cx_b, |buffer, _| {
             let snapshot = buffer.snapshot();
 
@@ -2991,8 +2997,6 @@ mod tests {
             .await
             .unwrap();
 
-        // Request the definition of a symbol as the guest.
-        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
             #[allow(deprecated)]
@@ -3009,7 +3013,11 @@ mod tests {
             }])
         });
 
-        let symbols = symbols.await.unwrap();
+        // Request the definition of a symbol as the guest.
+        let symbols = project_b
+            .update(cx_b, |p, cx| p.symbols("two", cx))
+            .await
+            .unwrap();
         assert_eq!(symbols.len(), 1);
         assert_eq!(symbols[0].name, "TWO");
 
@@ -3120,6 +3128,14 @@ mod tests {
             .await
             .unwrap();
 
+        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
+            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
+                lsp::Url::from_file_path("/root/b.rs").unwrap(),
+                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+            )))
+        });
+
         let definitions;
         let buffer_b2;
         if rng.gen() {
@@ -3130,14 +3146,6 @@ mod tests {
             definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
         }
 
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
-        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
-            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
-                lsp::Url::from_file_path("/root/b.rs").unwrap(),
-                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
-            )))
-        });
-
         let buffer_b2 = buffer_b2.await.unwrap();
         let definitions = definitions.await.unwrap();
         assert_eq!(definitions.len(), 1);