lsp: Serialize LSP notifications on background threads (#39403)

Piotr Osiewicz created

This should reduce hiccups when opening large files.

Release Notes:

- N/A

Change summary

crates/collab/src/tests/editor_tests.rs                  | 12 
crates/collab/src/tests/integration_tests.rs             | 14 
crates/copilot/src/copilot.rs                            | 30 ++-
crates/editor/src/editor_tests.rs                        | 53 +++--
crates/editor/src/inlay_hint_cache.rs                    |  4 
crates/editor/src/test/editor_lsp_test_context.rs        |  2 
crates/json_schema_store/src/json_schema_store.rs        |  4 
crates/language_tools/src/lsp_log_view.rs                |  2 
crates/language_tools/src/lsp_log_view_tests.rs          |  2 
crates/lsp/src/lsp.rs                                    | 88 ++++++---
crates/project/src/lsp_store.rs                          | 73 +++----
crates/project/src/lsp_store/json_language_server_ext.rs |  4 
crates/project/src/lsp_store/rust_analyzer_ext.rs        | 29 +-
crates/project/src/project_tests.rs                      | 16 
14 files changed, 186 insertions(+), 147 deletions(-)

Detailed changes

crates/collab/src/tests/editor_tests.rs 🔗

@@ -1272,7 +1272,7 @@ async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut Tes
     fake_language_server.start_progress("the-token").await;
 
     executor.advance_clock(SERVER_PROGRESS_THROTTLE_TIMEOUT);
-    fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+    fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
         token: lsp::NumberOrString::String("the-token".to_string()),
         value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Report(
             lsp::WorkDoneProgressReport {
@@ -1306,7 +1306,7 @@ async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut Tes
     });
 
     executor.advance_clock(SERVER_PROGRESS_THROTTLE_TIMEOUT);
-    fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+    fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
         token: lsp::NumberOrString::String("the-token".to_string()),
         value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Report(
             lsp::WorkDoneProgressReport {
@@ -2848,7 +2848,7 @@ async fn test_lsp_pull_diagnostics(
     });
 
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-        &lsp::PublishDiagnosticsParams {
+        lsp::PublishDiagnosticsParams {
             uri: lsp::Uri::from_file_path(path!("/a/main.rs")).unwrap(),
             diagnostics: vec![lsp::Diagnostic {
                 range: lsp::Range {
@@ -2869,7 +2869,7 @@ async fn test_lsp_pull_diagnostics(
         },
     );
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-        &lsp::PublishDiagnosticsParams {
+        lsp::PublishDiagnosticsParams {
             uri: lsp::Uri::from_file_path(path!("/a/lib.rs")).unwrap(),
             diagnostics: vec![lsp::Diagnostic {
                 range: lsp::Range {
@@ -2891,7 +2891,7 @@ async fn test_lsp_pull_diagnostics(
     );
 
     if should_stream_workspace_diagnostic {
-        fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+        fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
             token: expected_workspace_diagnostic_token.clone(),
             value: lsp::ProgressParamsValue::WorkspaceDiagnostic(
                 lsp::WorkspaceDiagnosticReportResult::Report(lsp::WorkspaceDiagnosticReport {
@@ -3073,7 +3073,7 @@ async fn test_lsp_pull_diagnostics(
     });
 
     if should_stream_workspace_diagnostic {
-        fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+        fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
             token: expected_workspace_diagnostic_token.clone(),
             value: lsp::ProgressParamsValue::WorkspaceDiagnostic(
                 lsp::WorkspaceDiagnosticReportResult::Report(lsp::WorkspaceDiagnosticReport {

crates/collab/src/tests/integration_tests.rs 🔗

@@ -4077,7 +4077,7 @@ async fn test_collaborating_with_diagnostics(
         .receive_notification::<lsp::notification::DidOpenTextDocument>()
         .await;
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-        &lsp::PublishDiagnosticsParams {
+        lsp::PublishDiagnosticsParams {
             uri: lsp::Uri::from_file_path(path!("/a/a.rs")).unwrap(),
             version: None,
             diagnostics: vec![lsp::Diagnostic {
@@ -4097,7 +4097,7 @@ async fn test_collaborating_with_diagnostics(
         .await
         .unwrap();
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-        &lsp::PublishDiagnosticsParams {
+        lsp::PublishDiagnosticsParams {
             uri: lsp::Uri::from_file_path(path!("/a/a.rs")).unwrap(),
             version: None,
             diagnostics: vec![lsp::Diagnostic {
@@ -4171,7 +4171,7 @@ async fn test_collaborating_with_diagnostics(
 
     // Simulate a language server reporting more errors for a file.
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-        &lsp::PublishDiagnosticsParams {
+        lsp::PublishDiagnosticsParams {
             uri: lsp::Uri::from_file_path(path!("/a/a.rs")).unwrap(),
             version: None,
             diagnostics: vec![
@@ -4269,7 +4269,7 @@ async fn test_collaborating_with_diagnostics(
 
     // Simulate a language server reporting no errors for a file.
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-        &lsp::PublishDiagnosticsParams {
+        lsp::PublishDiagnosticsParams {
             uri: lsp::Uri::from_file_path(path!("/a/a.rs")).unwrap(),
             version: None,
             diagnostics: Vec::new(),
@@ -4365,7 +4365,7 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
         .await
         .into_response()
         .unwrap();
-    fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+    fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
         token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
         value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin(
             lsp::WorkDoneProgressBegin {
@@ -4376,7 +4376,7 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
     });
     for file_name in file_names {
         fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
-            &lsp::PublishDiagnosticsParams {
+            lsp::PublishDiagnosticsParams {
                 uri: lsp::Uri::from_file_path(Path::new(path!("/test")).join(file_name)).unwrap(),
                 version: None,
                 diagnostics: vec![lsp::Diagnostic {
@@ -4389,7 +4389,7 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
             },
         );
     }
-    fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+    fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
         token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
         value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End(
             lsp::WorkDoneProgressEnd { message: None },

crates/copilot/src/copilot.rs 🔗

@@ -270,7 +270,7 @@ impl RegisteredBuffer {
                             server
                                 .lsp
                                 .notify::<lsp::notification::DidChangeTextDocument>(
-                                    &lsp::DidChangeTextDocumentParams {
+                                    lsp::DidChangeTextDocumentParams {
                                         text_document: lsp::VersionedTextDocumentIdentifier::new(
                                             buffer.uri.clone(),
                                             buffer.snapshot_version,
@@ -744,7 +744,7 @@ impl Copilot {
                 let snapshot = buffer.read(cx).snapshot();
                 server
                     .notify::<lsp::notification::DidOpenTextDocument>(
-                        &lsp::DidOpenTextDocumentParams {
+                        lsp::DidOpenTextDocumentParams {
                             text_document: lsp::TextDocumentItem {
                                 uri: uri.clone(),
                                 language_id: language_id.clone(),
@@ -792,13 +792,14 @@ impl Copilot {
                     server
                         .lsp
                         .notify::<lsp::notification::DidSaveTextDocument>(
-                            &lsp::DidSaveTextDocumentParams {
+                            lsp::DidSaveTextDocumentParams {
                                 text_document: lsp::TextDocumentIdentifier::new(
                                     registered_buffer.uri.clone(),
                                 ),
                                 text: None,
                             },
-                        )?;
+                        )
+                        .ok();
                 }
                 language::BufferEvent::FileHandleChanged
                 | language::BufferEvent::LanguageChanged => {
@@ -814,14 +815,15 @@ impl Copilot {
                         server
                             .lsp
                             .notify::<lsp::notification::DidCloseTextDocument>(
-                                &lsp::DidCloseTextDocumentParams {
+                                lsp::DidCloseTextDocumentParams {
                                     text_document: lsp::TextDocumentIdentifier::new(old_uri),
                                 },
-                            )?;
+                            )
+                            .ok();
                         server
                             .lsp
                             .notify::<lsp::notification::DidOpenTextDocument>(
-                                &lsp::DidOpenTextDocumentParams {
+                                lsp::DidOpenTextDocumentParams {
                                     text_document: lsp::TextDocumentItem::new(
                                         registered_buffer.uri.clone(),
                                         registered_buffer.language_id.clone(),
@@ -829,7 +831,8 @@ impl Copilot {
                                         registered_buffer.snapshot.text(),
                                     ),
                                 },
-                            )?;
+                            )
+                            .ok();
                     }
                 }
                 _ => {}
@@ -846,7 +849,7 @@ impl Copilot {
             server
                 .lsp
                 .notify::<lsp::notification::DidCloseTextDocument>(
-                    &lsp::DidCloseTextDocumentParams {
+                    lsp::DidCloseTextDocumentParams {
                         text_document: lsp::TextDocumentIdentifier::new(buffer.uri),
                     },
                 )
@@ -1151,9 +1154,12 @@ fn notify_did_change_config_to_server(
         }
     });
 
-    server.notify::<lsp::notification::DidChangeConfiguration>(&lsp::DidChangeConfigurationParams {
-        settings,
-    })
+    server
+        .notify::<lsp::notification::DidChangeConfiguration>(lsp::DidChangeConfigurationParams {
+            settings,
+        })
+        .ok();
+    Ok(())
 }
 
 async fn clear_copilot_dir() {

crates/editor/src/editor_tests.rs 🔗

@@ -12416,11 +12416,6 @@ async fn test_strip_whitespace_and_format_via_lsp(cx: &mut TestAppContext) {
         .join("\n"),
     );
 
-    // Submit a format request.
-    let format = cx
-        .update_editor(|editor, window, cx| editor.format(&Format, window, cx))
-        .unwrap();
-
     // Record which buffer changes have been sent to the language server
     let buffer_changes = Arc::new(Mutex::new(Vec::new()));
     cx.lsp
@@ -12441,28 +12436,29 @@ async fn test_strip_whitespace_and_format_via_lsp(cx: &mut TestAppContext) {
         .set_request_handler::<lsp::request::Formatting, _, _>({
             let buffer_changes = buffer_changes.clone();
             move |_, _| {
-                // When formatting is requested, trailing whitespace has already been stripped,
-                // and the trailing newline has already been added.
-                assert_eq!(
-                    &buffer_changes.lock()[1..],
-                    &[
-                        (
-                            lsp::Range::new(lsp::Position::new(0, 3), lsp::Position::new(0, 4)),
-                            "".into()
-                        ),
-                        (
-                            lsp::Range::new(lsp::Position::new(2, 5), lsp::Position::new(2, 6)),
-                            "".into()
-                        ),
-                        (
-                            lsp::Range::new(lsp::Position::new(3, 4), lsp::Position::new(3, 4)),
-                            "\n".into()
-                        ),
-                    ]
-                );
-
+                let buffer_changes = buffer_changes.clone();
                 // Insert blank lines between each line of the buffer.
                 async move {
+                    // When formatting is requested, trailing whitespace has already been stripped,
+                    // and the trailing newline has already been added.
+                    assert_eq!(
+                        &buffer_changes.lock()[1..],
+                        &[
+                            (
+                                lsp::Range::new(lsp::Position::new(0, 3), lsp::Position::new(0, 4)),
+                                "".into()
+                            ),
+                            (
+                                lsp::Range::new(lsp::Position::new(2, 5), lsp::Position::new(2, 6)),
+                                "".into()
+                            ),
+                            (
+                                lsp::Range::new(lsp::Position::new(3, 4), lsp::Position::new(3, 4)),
+                                "\n".into()
+                            ),
+                        ]
+                    );
+
                     Ok(Some(vec![
                         lsp::TextEdit {
                             range: lsp::Range::new(
@@ -12483,10 +12479,17 @@ async fn test_strip_whitespace_and_format_via_lsp(cx: &mut TestAppContext) {
             }
         });
 
+    // Submit a format request.
+    let format = cx
+        .update_editor(|editor, window, cx| editor.format(&Format, window, cx))
+        .unwrap();
+
+    cx.run_until_parked();
     // After formatting the buffer, the trailing whitespace is stripped,
     // a newline is appended, and the edits provided by the language server
     // have been applied.
     format.await.unwrap();
+
     cx.assert_editor_state(
         &[
             "one",   //

crates/editor/src/inlay_hint_cache.rs 🔗

@@ -1495,7 +1495,7 @@ pub mod tests {
             .into_response()
             .expect("work done progress create request failed");
         cx.executor().run_until_parked();
-        fake_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+        fake_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
             token: lsp::ProgressToken::String(progress_token.to_string()),
             value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin(
                 lsp::WorkDoneProgressBegin::default(),
@@ -1515,7 +1515,7 @@ pub mod tests {
             })
             .unwrap();
 
-        fake_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
+        fake_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
             token: lsp::ProgressToken::String(progress_token.to_string()),
             value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End(
                 lsp::WorkDoneProgressEnd::default(),

crates/editor/src/test/editor_lsp_test_context.rs 🔗

@@ -440,7 +440,7 @@ impl EditorLspTestContext {
     }
 
     pub fn notify<T: notification::Notification>(&self, params: T::Params) {
-        self.lsp.notify::<T>(&params);
+        self.lsp.notify::<T>(params);
     }
 
     #[cfg(target_os = "windows")]

crates/language_tools/src/lsp_log_view_tests.rs 🔗

@@ -73,7 +73,7 @@ async fn test_lsp_log_view(cx: &mut TestAppContext) {
     let log_view = window.root(cx).unwrap();
     let mut cx = VisualTestContext::from_window(*window, cx);
 
-    language_server.notify::<lsp::notification::LogMessage>(&lsp::LogMessageParams {
+    language_server.notify::<lsp::notification::LogMessage>(lsp::LogMessageParams {
         message: "hello from the server".into(),
         typ: lsp::MessageType::INFO,
     });

crates/lsp/src/lsp.rs 🔗

@@ -80,11 +80,14 @@ pub struct LanguageServerBinaryOptions {
     pub pre_release: bool,
 }
 
+struct NotificationSerializer(Box<dyn FnOnce() -> String + Send + Sync>);
+
 /// A running language server process.
 pub struct LanguageServer {
     server_id: LanguageServerId,
     next_id: AtomicI32,
     outbound_tx: channel::Sender<String>,
+    notification_tx: channel::Sender<NotificationSerializer>,
     name: LanguageServerName,
     process_name: Arc<str>,
     binary: LanguageServerBinary,
@@ -477,9 +480,24 @@ impl LanguageServer {
         }
         .into();
 
+        let (notification_tx, notification_rx) = channel::unbounded::<NotificationSerializer>();
+        cx.background_spawn({
+            let outbound_tx = outbound_tx.clone();
+            async move {
+                while let Ok(serializer) = notification_rx.recv().await {
+                    let serialized = (serializer.0)();
+                    let Ok(_) = outbound_tx.send(serialized).await else {
+                        return;
+                    };
+                }
+                outbound_tx.close();
+            }
+        })
+        .detach();
         Self {
             server_id,
             notification_handlers,
+            notification_tx,
             response_handlers,
             io_handlers,
             name: server_name,
@@ -906,7 +924,7 @@ impl LanguageServer {
             self.capabilities = RwLock::new(response.capabilities);
             self.configuration = configuration;
 
-            self.notify::<notification::Initialized>(&InitializedParams {})?;
+            self.notify::<notification::Initialized>(InitializedParams {})?;
             Ok(Arc::new(self))
         })
     }
@@ -918,11 +936,13 @@ impl LanguageServer {
             let next_id = AtomicI32::new(self.next_id.load(SeqCst));
             let outbound_tx = self.outbound_tx.clone();
             let executor = self.executor.clone();
+            let notification_serializers = self.notification_tx.clone();
             let mut output_done = self.output_done_rx.lock().take().unwrap();
             let shutdown_request = Self::request_internal::<request::Shutdown>(
                 &next_id,
                 &response_handlers,
                 &outbound_tx,
+                &notification_serializers,
                 &executor,
                 (),
             );
@@ -956,8 +976,8 @@ impl LanguageServer {
                 }
 
                 response_handlers.lock().take();
-                Self::notify_internal::<notification::Exit>(&outbound_tx, &()).ok();
-                outbound_tx.close();
+                Self::notify_internal::<notification::Exit>(&notification_serializers, ()).ok();
+                notification_serializers.close();
                 output_done.recv().await;
                 server.lock().take().map(|mut child| child.kill());
                 drop(tasks);
@@ -1179,6 +1199,7 @@ impl LanguageServer {
             &self.next_id,
             &self.response_handlers,
             &self.outbound_tx,
+            &self.notification_tx,
             &self.executor,
             params,
         )
@@ -1200,6 +1221,7 @@ impl LanguageServer {
             &self.next_id,
             &self.response_handlers,
             &self.outbound_tx,
+            &self.notification_tx,
             &self.executor,
             timer,
             params,
@@ -1210,6 +1232,7 @@ impl LanguageServer {
         next_id: &AtomicI32,
         response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
         outbound_tx: &channel::Sender<String>,
+        notification_serializers: &channel::Sender<NotificationSerializer>,
         executor: &BackgroundExecutor,
         timer: U,
         params: T::Params,
@@ -1261,7 +1284,7 @@ impl LanguageServer {
             .try_send(message)
             .context("failed to write to language server's stdin");
 
-        let outbound_tx = outbound_tx.downgrade();
+        let notification_serializers = notification_serializers.downgrade();
         let started = Instant::now();
         LspRequest::new(id, async move {
             if let Err(e) = handle_response {
@@ -1272,10 +1295,10 @@ impl LanguageServer {
             }
 
             let cancel_on_drop = util::defer(move || {
-                if let Some(outbound_tx) = outbound_tx.upgrade() {
+                if let Some(notification_serializers) = notification_serializers.upgrade() {
                     Self::notify_internal::<notification::Cancel>(
-                        &outbound_tx,
-                        &CancelParams {
+                        &notification_serializers,
+                        CancelParams {
                             id: NumberOrString::Number(id),
                         },
                     )
@@ -1310,6 +1333,7 @@ impl LanguageServer {
         next_id: &AtomicI32,
         response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
         outbound_tx: &channel::Sender<String>,
+        notification_serializers: &channel::Sender<NotificationSerializer>,
         executor: &BackgroundExecutor,
         params: T::Params,
     ) -> impl LspRequestFuture<T::Result> + use<T>
@@ -1321,6 +1345,7 @@ impl LanguageServer {
             next_id,
             response_handlers,
             outbound_tx,
+            notification_serializers,
             executor,
             Self::default_request_timer(executor.clone()),
             params,
@@ -1336,21 +1361,25 @@ impl LanguageServer {
     /// Sends a RPC notification to the language server.
     ///
     /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
-    pub fn notify<T: notification::Notification>(&self, params: &T::Params) -> Result<()> {
-        Self::notify_internal::<T>(&self.outbound_tx, params)
+    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
+        let outbound = self.notification_tx.clone();
+        Self::notify_internal::<T>(&outbound, params)
     }
 
     fn notify_internal<T: notification::Notification>(
-        outbound_tx: &channel::Sender<String>,
-        params: &T::Params,
+        outbound_tx: &channel::Sender<NotificationSerializer>,
+        params: T::Params,
     ) -> Result<()> {
-        let message = serde_json::to_string(&Notification {
-            jsonrpc: JSON_RPC_VERSION,
-            method: T::METHOD,
-            params,
-        })
-        .unwrap();
-        outbound_tx.try_send(message)?;
+        let serializer = NotificationSerializer(Box::new(move || {
+            serde_json::to_string(&Notification {
+                jsonrpc: JSON_RPC_VERSION,
+                method: T::METHOD,
+                params,
+            })
+            .unwrap()
+        }));
+
+        outbound_tx.send_blocking(serializer)?;
         Ok(())
     }
 
@@ -1385,7 +1414,7 @@ impl LanguageServer {
                     removed: vec![],
                 },
             };
-            self.notify::<DidChangeWorkspaceFolders>(&params).ok();
+            self.notify::<DidChangeWorkspaceFolders>(params).ok();
         }
     }
 
@@ -1419,7 +1448,7 @@ impl LanguageServer {
                     }],
                 },
             };
-            self.notify::<DidChangeWorkspaceFolders>(&params).ok();
+            self.notify::<DidChangeWorkspaceFolders>(params).ok();
         }
     }
     pub fn set_workspace_folders(&self, folders: BTreeSet<Uri>) {
@@ -1451,7 +1480,7 @@ impl LanguageServer {
             let params = DidChangeWorkspaceFoldersParams {
                 event: WorkspaceFoldersChangeEvent { added, removed },
             };
-            self.notify::<DidChangeWorkspaceFolders>(&params).ok();
+            self.notify::<DidChangeWorkspaceFolders>(params).ok();
         }
     }
 
@@ -1469,14 +1498,14 @@ impl LanguageServer {
         version: i32,
         initial_text: String,
     ) {
-        self.notify::<notification::DidOpenTextDocument>(&DidOpenTextDocumentParams {
+        self.notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
             text_document: TextDocumentItem::new(uri, language_id, version, initial_text),
         })
         .ok();
     }
 
     pub fn unregister_buffer(&self, uri: Uri) {
-        self.notify::<notification::DidCloseTextDocument>(&DidCloseTextDocumentParams {
+        self.notify::<notification::DidCloseTextDocument>(DidCloseTextDocumentParams {
             text_document: TextDocumentIdentifier::new(uri),
         })
         .ok();
@@ -1692,7 +1721,7 @@ impl LanguageServer {
 #[cfg(any(test, feature = "test-support"))]
 impl FakeLanguageServer {
     /// See [`LanguageServer::notify`].
-    pub fn notify<T: notification::Notification>(&self, params: &T::Params) {
+    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
         self.server.notify::<T>(params).ok();
     }
 
@@ -1801,7 +1830,7 @@ impl FakeLanguageServer {
         .await
         .into_response()
         .unwrap();
-        self.notify::<notification::Progress>(&ProgressParams {
+        self.notify::<notification::Progress>(ProgressParams {
             token: NumberOrString::String(token),
             value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(progress)),
         });
@@ -1809,7 +1838,7 @@ impl FakeLanguageServer {
 
     /// Simulate that the server has completed work and notifies about that with the specified token.
     pub fn end_progress(&self, token: impl Into<String>) {
-        self.notify::<notification::Progress>(&ProgressParams {
+        self.notify::<notification::Progress>(ProgressParams {
             token: NumberOrString::String(token.into()),
             value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
         });
@@ -1868,7 +1897,7 @@ mod tests {
             .await
             .unwrap();
         server
-            .notify::<notification::DidOpenTextDocument>(&DidOpenTextDocumentParams {
+            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
                 text_document: TextDocumentItem::new(
                     Uri::from_str("file://a/b").unwrap(),
                     "rust".to_string(),
@@ -1886,11 +1915,11 @@ mod tests {
             "file://a/b"
         );
 
-        fake.notify::<notification::ShowMessage>(&ShowMessageParams {
+        fake.notify::<notification::ShowMessage>(ShowMessageParams {
             typ: MessageType::ERROR,
             message: "ok".to_string(),
         });
-        fake.notify::<notification::PublishDiagnostics>(&PublishDiagnosticsParams {
+        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
             uri: Uri::from_str("file://b/c").unwrap(),
             version: Some(5),
             diagnostics: vec![],
@@ -1904,6 +1933,7 @@ mod tests {
         fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
 
         drop(server);
+        cx.run_until_parked();
         fake.receive_notification::<notification::Exit>().await;
     }
 

crates/project/src/lsp_store.rs 🔗

@@ -406,15 +406,14 @@ impl LocalLspStore {
                         adapter.clone(),
                     );
 
-                    let did_change_configuration_params =
-                        Arc::new(lsp::DidChangeConfigurationParams {
-                            settings: workspace_config,
-                        });
+                    let did_change_configuration_params = lsp::DidChangeConfigurationParams {
+                        settings: workspace_config,
+                    };
                     let language_server = cx
                         .update(|cx| {
                             language_server.initialize(
                                 initialization_params,
-                                did_change_configuration_params.clone(),
+                                Arc::new(did_change_configuration_params.clone()),
                                 cx,
                             )
                         })?
@@ -430,11 +429,9 @@ impl LocalLspStore {
                             }
                         })?;
 
-                    language_server
-                        .notify::<lsp::notification::DidChangeConfiguration>(
-                            &did_change_configuration_params,
-                        )
-                        .ok();
+                    language_server.notify::<lsp::notification::DidChangeConfiguration>(
+                        did_change_configuration_params,
+                    )?;
 
                     anyhow::Ok(language_server)
                 }
@@ -7206,7 +7203,7 @@ impl LspStore {
 
             language_server
                 .notify::<lsp::notification::DidChangeTextDocument>(
-                    &lsp::DidChangeTextDocumentParams {
+                    lsp::DidChangeTextDocumentParams {
                         text_document: lsp::VersionedTextDocumentIdentifier::new(
                             uri.clone(),
                             next_version,
@@ -7243,7 +7240,7 @@ impl LspStore {
                 };
                 server
                     .notify::<lsp::notification::DidSaveTextDocument>(
-                        &lsp::DidSaveTextDocumentParams {
+                        lsp::DidSaveTextDocumentParams {
                             text_document: text_document.clone(),
                             text,
                         },
@@ -7314,7 +7311,7 @@ impl LspStore {
                                             .ok()?;
                                         server
                                             .notify::<lsp::notification::DidChangeConfiguration>(
-                                                &lsp::DidChangeConfigurationParams { settings },
+                                                lsp::DidChangeConfigurationParams { settings },
                                             )
                                             .ok()?;
                                         Some(())
@@ -8536,15 +8533,16 @@ impl LspStore {
         cx: AsyncApp,
     ) -> Result<proto::Ack> {
         let server_id = LanguageServerId(envelope.payload.language_server_id as usize);
-        lsp_store.read_with(&cx, |lsp_store, _| {
+        let task = lsp_store.read_with(&cx, |lsp_store, _| {
             if let Some(server) = lsp_store.language_server_for_id(server_id) {
-                server
-                    .notify::<lsp_store::lsp_ext_command::LspExtCancelFlycheck>(&())
-                    .context("handling lsp ext cancel flycheck")
+                Some(server.notify::<lsp_store::lsp_ext_command::LspExtCancelFlycheck>(()))
             } else {
-                anyhow::Ok(())
+                None
             }
-        })??;
+        })?;
+        if let Some(task) = task {
+            task.context("handling lsp ext cancel flycheck")?;
+        }
 
         Ok(proto::Ack {})
     }
@@ -8578,14 +8576,11 @@ impl LspStore {
                 } else {
                     None
                 };
-                server
-                    .notify::<lsp_store::lsp_ext_command::LspExtRunFlycheck>(
-                        &lsp_store::lsp_ext_command::RunFlycheckParams { text_document },
-                    )
-                    .context("handling lsp ext run flycheck")
-            } else {
-                anyhow::Ok(())
+                server.notify::<lsp_store::lsp_ext_command::LspExtRunFlycheck>(
+                    lsp_store::lsp_ext_command::RunFlycheckParams { text_document },
+                )?;
             }
+            anyhow::Ok(())
         })??;
 
         Ok(proto::Ack {})
@@ -8597,15 +8592,15 @@ impl LspStore {
         cx: AsyncApp,
     ) -> Result<proto::Ack> {
         let server_id = LanguageServerId(envelope.payload.language_server_id as usize);
-        lsp_store.read_with(&cx, |lsp_store, _| {
-            if let Some(server) = lsp_store.language_server_for_id(server_id) {
-                server
-                    .notify::<lsp_store::lsp_ext_command::LspExtClearFlycheck>(&())
-                    .context("handling lsp ext clear flycheck")
-            } else {
-                anyhow::Ok(())
-            }
-        })??;
+        lsp_store
+            .read_with(&cx, |lsp_store, _| {
+                if let Some(server) = lsp_store.language_server_for_id(server_id) {
+                    Some(server.notify::<lsp_store::lsp_ext_command::LspExtClearFlycheck>(()))
+                } else {
+                    None
+                }
+            })
+            .context("handling lsp ext clear flycheck")?;
 
         Ok(proto::Ack {})
     }
@@ -8744,7 +8739,7 @@ impl LspStore {
 
                 if filter.should_send_did_rename(&old_uri, is_dir) {
                     language_server
-                        .notify::<DidRenameFiles>(&RenameFilesParams {
+                        .notify::<DidRenameFiles>(RenameFilesParams {
                             files: vec![FileRename {
                                 old_uri: old_uri.clone(),
                                 new_uri: new_uri.clone(),
@@ -8858,7 +8853,7 @@ impl LspStore {
             if !changes.is_empty() {
                 server
                     .notify::<lsp::notification::DidChangeWatchedFiles>(
-                        &lsp::DidChangeWatchedFilesParams { changes },
+                        lsp::DidChangeWatchedFilesParams { changes },
                     )
                     .ok();
             }
@@ -10668,7 +10663,7 @@ impl LspStore {
                     if progress.is_cancellable {
                         server
                             .notify::<lsp::notification::WorkDoneProgressCancel>(
-                                &WorkDoneProgressCancelParams {
+                                WorkDoneProgressCancelParams {
                                     token: lsp::NumberOrString::String(token.clone()),
                                 },
                             )
@@ -10799,7 +10794,7 @@ impl LspStore {
                 };
                 if !params.changes.is_empty() {
                     server
-                        .notify::<lsp::notification::DidChangeWatchedFiles>(&params)
+                        .notify::<lsp::notification::DidChangeWatchedFiles>(params)
                         .ok();
                 }
             }

crates/project/src/lsp_store/json_language_server_ext.rs 🔗

@@ -42,7 +42,7 @@ impl lsp::notification::Notification for SchemaContentsChanged {
     type Params = String;
 }
 
-pub fn notify_schema_changed(lsp_store: Entity<LspStore>, uri: &String, cx: &App) {
+pub fn notify_schema_changed(lsp_store: Entity<LspStore>, uri: String, cx: &App) {
     zlog::trace!(LOGGER => "Notifying schema changed for URI: {:?}", uri);
     let servers = lsp_store.read_with(cx, |lsp_store, _| {
         let mut servers = Vec::new();
@@ -65,7 +65,7 @@ pub fn notify_schema_changed(lsp_store: Entity<LspStore>, uri: &String, cx: &App
     for server in servers {
         zlog::trace!(LOGGER => "Notifying server {:?} of schema change for URI: {:?}", server.server_id(), &uri);
         // TODO: handle errors
-        server.notify::<SchemaContentsChanged>(uri).ok();
+        server.notify::<SchemaContentsChanged>(uri.clone()).ok();
     }
 }
 

crates/project/src/lsp_store/rust_analyzer_ext.rs 🔗

@@ -119,11 +119,12 @@ pub fn cancel_flycheck(
             lsp_store
                 .read_with(cx, |lsp_store, _| {
                     if let Some(server) = lsp_store.language_server_for_id(rust_analyzer_server) {
-                        server.notify::<lsp_store::lsp_ext_command::LspExtCancelFlycheck>(&())?;
+                        server.notify::<lsp_store::lsp_ext_command::LspExtCancelFlycheck>(())
+                    } else {
+                        Ok(())
                     }
-                    anyhow::Ok(())
-                })?
-                .context("lsp ext cancel flycheck")?;
+                })
+                .context("lsp ext cancel flycheck")??;
         };
         anyhow::Ok(())
     })
@@ -173,14 +174,15 @@ pub fn run_flycheck(
                 .read_with(cx, |lsp_store, _| {
                     if let Some(server) = lsp_store.language_server_for_id(rust_analyzer_server) {
                         server.notify::<lsp_store::lsp_ext_command::LspExtRunFlycheck>(
-                            &lsp_store::lsp_ext_command::RunFlycheckParams {
+                            lsp_store::lsp_ext_command::RunFlycheckParams {
                                 text_document: None,
                             },
-                        )?;
+                        )
+                    } else {
+                        Ok(())
                     }
-                    anyhow::Ok(())
-                })?
-                .context("lsp ext run flycheck")?;
+                })
+                .context("lsp ext run flycheck")??;
         };
         anyhow::Ok(())
     })
@@ -224,11 +226,12 @@ pub fn clear_flycheck(
             lsp_store
                 .read_with(cx, |lsp_store, _| {
                     if let Some(server) = lsp_store.language_server_for_id(rust_analyzer_server) {
-                        server.notify::<lsp_store::lsp_ext_command::LspExtClearFlycheck>(&())?;
+                        server.notify::<lsp_store::lsp_ext_command::LspExtClearFlycheck>(())
+                    } else {
+                        Ok(())
                     }
-                    anyhow::Ok(())
-                })?
-                .context("lsp ext clear flycheck")?;
+                })
+                .context("lsp ext clear flycheck")??;
         };
         anyhow::Ok(())
     })

crates/project/src/project_tests.rs 🔗

@@ -1820,7 +1820,7 @@ async fn test_disk_based_diagnostics_progress(cx: &mut gpui::TestAppContext) {
         }
     );
 
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: None,
         diagnostics: vec![lsp::Diagnostic {
@@ -1873,7 +1873,7 @@ async fn test_disk_based_diagnostics_progress(cx: &mut gpui::TestAppContext) {
     });
 
     // Ensure publishing empty diagnostics twice only results in one update event.
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: None,
         diagnostics: Default::default(),
@@ -1886,7 +1886,7 @@ async fn test_disk_based_diagnostics_progress(cx: &mut gpui::TestAppContext) {
         }
     );
 
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: None,
         diagnostics: Default::default(),
@@ -2018,7 +2018,7 @@ async fn test_restarting_server_with_diagnostics_published(cx: &mut gpui::TestAp
 
     // Publish diagnostics
     let fake_server = fake_servers.next().await.unwrap();
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: None,
         diagnostics: vec![lsp::Diagnostic {
@@ -2099,7 +2099,7 @@ async fn test_restarted_server_reporting_invalid_buffer_version(cx: &mut gpui::T
 
     // Before restarting the server, report diagnostics with an unknown buffer version.
     let fake_server = fake_servers.next().await.unwrap();
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: lsp::Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: Some(10000),
         diagnostics: Vec::new(),
@@ -2350,7 +2350,7 @@ async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) {
     assert!(change_notification_1.text_document.version > open_notification.text_document.version);
 
     // Report some diagnostics for the initial version of the buffer
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: lsp::Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: Some(open_notification.text_document.version),
         diagnostics: vec![
@@ -2438,7 +2438,7 @@ async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) {
     });
 
     // Ensure overlapping diagnostics are highlighted correctly.
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: lsp::Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: Some(open_notification.text_document.version),
         diagnostics: vec![
@@ -2532,7 +2532,7 @@ async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) {
     );
 
     // Handle out-of-order diagnostics
-    fake_server.notify::<lsp::notification::PublishDiagnostics>(&lsp::PublishDiagnosticsParams {
+    fake_server.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
         uri: lsp::Uri::from_file_path(path!("/dir/a.rs")).unwrap(),
         version: Some(change_notification_2.text_document.version),
         diagnostics: vec![