Revert "lsp: Parse LSP messages on background thread - again (#23122)" (#23301)

Conrad Irwin created

This reverts commit 1b3b825c7f646f2fb34e2bceb6be33f310dfeb59.

When debugging git diffs we found that this introduced a re-ordering of
messages sent to the LSP:

* User hits "format"
* Zed adjusts spacing, and sends "spaces changed" to the LSP
* Zed sends "format" to LSP

With the async approach here, the format request can now arrive before
the space changed request.

You can reproduce this with `test_strip_whitespace_and_format_via_lsp`
under some conditions.

Release Notes:

- N/A

Change summary

crates/collab/src/tests/integration_tests.rs      |   1 
crates/editor/src/test/editor_lsp_test_context.rs |   4 
crates/lsp/src/lsp.rs                             | 140 +++++++---------
3 files changed, 62 insertions(+), 83 deletions(-)

Detailed changes

crates/collab/src/tests/integration_tests.rs 🔗

@@ -4197,7 +4197,6 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
                 }],
             },
         );
-        executor.run_until_parked();
     }
     fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
         token: lsp::NumberOrString::String("the-disk-based-token".to_string()),

crates/editor/src/test/editor_lsp_test_context.rs 🔗

@@ -315,12 +315,12 @@ impl EditorLspTestContext {
 
     pub fn handle_request<T, F, Fut>(
         &self,
-        handler: F,
+        mut handler: F,
     ) -> futures::channel::mpsc::UnboundedReceiver<()>
     where
         T: 'static + request::Request,
         T::Params: 'static + Send,
-        F: 'static + Send + Sync + Fn(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
+        F: 'static + Send + FnMut(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
         Fut: 'static + Send + Future<Output = Result<T::Result>>,
     {
         let url = self.buffer_lsp_url.clone();

crates/lsp/src/lsp.rs 🔗

@@ -45,7 +45,7 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: ";
 const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
 const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
 
-type NotificationHandler = Arc<dyn Send + Sync + Fn(Option<RequestId>, Value, AsyncAppContext)>;
+type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, AsyncAppContext)>;
 type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
 type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 
@@ -890,7 +890,7 @@ impl LanguageServer {
     pub fn on_notification<T, F>(&self, f: F) -> Subscription
     where
         T: notification::Notification,
-        F: 'static + Send + Sync + Fn(T::Params, AsyncAppContext),
+        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
     {
         self.on_custom_notification(T::METHOD, f)
     }
@@ -903,7 +903,7 @@ impl LanguageServer {
     where
         T: request::Request,
         T::Params: 'static + Send,
-        F: 'static + Fn(T::Params, AsyncAppContext) -> Fut + Send + Sync,
+        F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
         Fut: 'static + Future<Output = Result<T::Result>>,
     {
         self.on_custom_request(T::METHOD, f)
@@ -939,27 +939,17 @@ impl LanguageServer {
     }
 
     #[must_use]
-    fn on_custom_notification<Params, F>(&self, method: &'static str, f: F) -> Subscription
+    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
     where
-        F: 'static + Fn(Params, AsyncAppContext) + Send + Sync,
-        Params: DeserializeOwned + Send + 'static,
+        F: 'static + FnMut(Params, AsyncAppContext) + Send,
+        Params: DeserializeOwned,
     {
-        let callback = Arc::new(f);
         let prev_handler = self.notification_handlers.lock().insert(
             method,
-            Arc::new(move |_, params, cx| {
-                let callback = callback.clone();
-
-                cx.spawn(move |cx| async move {
-                    if let Some(params) = cx
-                        .background_executor()
-                        .spawn(async move { serde_json::from_value(params).log_err() })
-                        .await
-                    {
-                        callback(params, cx);
-                    }
-                })
-                .detach();
+            Box::new(move |_, params, cx| {
+                if let Some(params) = serde_json::from_value(params).log_err() {
+                    f(params, cx);
+                }
             }),
         );
         assert!(
@@ -973,74 +963,64 @@ impl LanguageServer {
     }
 
     #[must_use]
-    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, f: F) -> Subscription
+    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
     where
-        F: 'static + Fn(Params, AsyncAppContext) -> Fut + Send + Sync,
+        F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
         Fut: 'static + Future<Output = Result<Res>>,
         Params: DeserializeOwned + Send + 'static,
         Res: Serialize,
     {
         let outbound_tx = self.outbound_tx.clone();
-        let f = Arc::new(f);
         let prev_handler = self.notification_handlers.lock().insert(
             method,
-            Arc::new(move |id, params, cx| {
+            Box::new(move |id, params, cx| {
                 if let Some(id) = id {
-                    let f = f.clone();
-                    let deserialized_params = cx
-                        .background_executor()
-                        .spawn(async move { serde_json::from_value(params) });
-
-                    cx.spawn({
-                        let outbound_tx = outbound_tx.clone();
-                        move |cx| async move {
-                            match deserialized_params.await {
-                                Ok(params) => {
-                                    let response = f(params, cx.clone());
-                                    let response = match response.await {
-                                        Ok(result) => Response {
-                                            jsonrpc: JSON_RPC_VERSION,
-                                            id,
-                                            value: LspResult::Ok(Some(result)),
-                                        },
-                                        Err(error) => Response {
-                                            jsonrpc: JSON_RPC_VERSION,
-                                            id,
-                                            value: LspResult::Error(Some(Error {
-                                                message: error.to_string(),
-                                            })),
-                                        },
-                                    };
-                                    if let Some(response) =
-                                        serde_json::to_string(&response).log_err()
-                                    {
-                                        outbound_tx.try_send(response).ok();
-                                    }
-                                }
-                                Err(error) => {
-                                    log::error!(
-                                        "error deserializing {} request: {:?}",
-                                        method,
-                                        error
-                                    );
-                                    let response = AnyResponse {
-                                        jsonrpc: JSON_RPC_VERSION,
-                                        id,
-                                        result: None,
-                                        error: Some(Error {
-                                            message: error.to_string(),
-                                        }),
-                                    };
-                                    if let Some(response) =
-                                        serde_json::to_string(&response).log_err()
-                                    {
-                                        outbound_tx.try_send(response).ok();
+                    match serde_json::from_value(params) {
+                        Ok(params) => {
+                            let response = f(params, cx.clone());
+                            cx.foreground_executor()
+                                .spawn({
+                                    let outbound_tx = outbound_tx.clone();
+                                    async move {
+                                        let response = match response.await {
+                                            Ok(result) => Response {
+                                                jsonrpc: JSON_RPC_VERSION,
+                                                id,
+                                                value: LspResult::Ok(Some(result)),
+                                            },
+                                            Err(error) => Response {
+                                                jsonrpc: JSON_RPC_VERSION,
+                                                id,
+                                                value: LspResult::Error(Some(Error {
+                                                    message: error.to_string(),
+                                                })),
+                                            },
+                                        };
+                                        if let Some(response) =
+                                            serde_json::to_string(&response).log_err()
+                                        {
+                                            outbound_tx.try_send(response).ok();
+                                        }
                                     }
-                                }
+                                })
+                                .detach();
+                        }
+
+                        Err(error) => {
+                            log::error!("error deserializing {} request: {:?}", method, error);
+                            let response = AnyResponse {
+                                jsonrpc: JSON_RPC_VERSION,
+                                id,
+                                result: None,
+                                error: Some(Error {
+                                    message: error.to_string(),
+                                }),
+                            };
+                            if let Some(response) = serde_json::to_string(&response).log_err() {
+                                outbound_tx.try_send(response).ok();
                             }
                         }
-                    })
-                    .detach();
+                    }
                 }
             }),
         );
@@ -1445,12 +1425,12 @@ impl FakeLanguageServer {
     /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
     pub fn handle_request<T, F, Fut>(
         &self,
-        handler: F,
+        mut handler: F,
     ) -> futures::channel::mpsc::UnboundedReceiver<()>
     where
         T: 'static + request::Request,
         T::Params: 'static + Send,
-        F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext) -> Fut,
+        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
         Fut: 'static + Send + Future<Output = Result<T::Result>>,
     {
         let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
@@ -1474,12 +1454,12 @@ impl FakeLanguageServer {
     /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
     pub fn handle_notification<T, F>(
         &self,
-        handler: F,
+        mut handler: F,
     ) -> futures::channel::mpsc::UnboundedReceiver<()>
     where
         T: 'static + notification::Notification,
         T::Params: 'static + Send,
-        F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext),
+        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
     {
         let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
         self.server.remove_notification_handler::<T>();