From 1b3b825c7f646f2fb34e2bceb6be33f310dfeb59 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:50:54 +0100 Subject: [PATCH] lsp: Parse LSP messages on background thread - again (#23122) This is a follow-up to #12640. While profiling latency of working with a project with 8192 diagnostics I've noticed that while we're parsing the LSP messages into a generic message struct on a background thread, we can still block the main thread as the conversion between that generic message struct and the actual LSP message (for use by callback) is still happening on the main thread. This PR significantly constrains what a message callback can use, so that it can be executed on any thread; we also send off message conversion to the background thread. In practice new callback constraints were already satisfied by all call sites, so no code outside of the lsp crate had to be adjusted. This has improved throughput of my 8192-benchmark from 40s to send out all diagnostics after saving to ~20s. Now main thread is spending most of the time updating our diagnostics sets, which can probably be improved too. Closes #ISSUE Release Notes: - Improved app responsiveness with huge # of diagnostics. --- crates/collab/src/tests/integration_tests.rs | 1 + .../src/test/editor_lsp_test_context.rs | 4 +- crates/lsp/src/lsp.rs | 140 ++++++++++-------- 3 files changed, 83 insertions(+), 62 deletions(-) diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 30c4cedacbb125dfd8ca33a418daa891c1181e8f..3dfd9b676c08af79adfc49d165b55205594c130f 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -4206,6 +4206,7 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering( }], }, ); + executor.run_until_parked(); } fake_language_server.notify::(&lsp::ProgressParams { token: lsp::NumberOrString::String("the-disk-based-token".to_string()), diff --git a/crates/editor/src/test/editor_lsp_test_context.rs b/crates/editor/src/test/editor_lsp_test_context.rs index 23e37a1267bdbf04d691bc47660862403601e14c..87be96afc7f1ccc893624c84f32f499a735b4532 100644 --- a/crates/editor/src/test/editor_lsp_test_context.rs +++ b/crates/editor/src/test/editor_lsp_test_context.rs @@ -315,12 +315,12 @@ impl EditorLspTestContext { pub fn handle_request( &self, - mut handler: F, + handler: F, ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, T::Params: 'static + Send, - F: 'static + Send + FnMut(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut, + F: 'static + Send + Sync + Fn(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut, Fut: 'static + Send + Future>, { let url = self.buffer_lsp_url.clone(); diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index e9fa1caac239892e2f1a3aed1c923fbdcb9763aa..b445c3bb95960ab42c25adf7f3f0bd06f3e2df78 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -45,7 +45,7 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: "; const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2); const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); -type NotificationHandler = Box, Value, AsyncAppContext)>; +type NotificationHandler = Arc, Value, AsyncAppContext)>; type ResponseHandler = Box)>; type IoHandler = Box; @@ -890,7 +890,7 @@ impl LanguageServer { pub fn on_notification(&self, f: F) -> Subscription where T: notification::Notification, - F: 'static + Send + FnMut(T::Params, AsyncAppContext), + F: 'static + Send + Sync + Fn(T::Params, AsyncAppContext), { self.on_custom_notification(T::METHOD, f) } @@ -903,7 +903,7 @@ impl LanguageServer { where T: request::Request, T::Params: 'static + Send, - F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send, + F: 'static + Fn(T::Params, AsyncAppContext) -> Fut + Send + Sync, Fut: 'static + Future>, { self.on_custom_request(T::METHOD, f) @@ -939,17 +939,27 @@ impl LanguageServer { } #[must_use] - fn on_custom_notification(&self, method: &'static str, mut f: F) -> Subscription + fn on_custom_notification(&self, method: &'static str, f: F) -> Subscription where - F: 'static + FnMut(Params, AsyncAppContext) + Send, - Params: DeserializeOwned, + F: 'static + Fn(Params, AsyncAppContext) + Send + Sync, + Params: DeserializeOwned + Send + 'static, { + let callback = Arc::new(f); let prev_handler = self.notification_handlers.lock().insert( method, - Box::new(move |_, params, cx| { - if let Some(params) = serde_json::from_value(params).log_err() { - f(params, cx); - } + Arc::new(move |_, params, cx| { + let callback = callback.clone(); + + cx.spawn(move |cx| async move { + if let Some(params) = cx + .background_executor() + .spawn(async move { serde_json::from_value(params).log_err() }) + .await + { + callback(params, cx); + } + }) + .detach(); }), ); assert!( @@ -963,64 +973,74 @@ impl LanguageServer { } #[must_use] - fn on_custom_request(&self, method: &'static str, mut f: F) -> Subscription + fn on_custom_request(&self, method: &'static str, f: F) -> Subscription where - F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send, + F: 'static + Fn(Params, AsyncAppContext) -> Fut + Send + Sync, Fut: 'static + Future>, Params: DeserializeOwned + Send + 'static, Res: Serialize, { let outbound_tx = self.outbound_tx.clone(); + let f = Arc::new(f); let prev_handler = self.notification_handlers.lock().insert( method, - Box::new(move |id, params, cx| { + Arc::new(move |id, params, cx| { if let Some(id) = id { - match serde_json::from_value(params) { - Ok(params) => { - let response = f(params, cx.clone()); - cx.foreground_executor() - .spawn({ - let outbound_tx = outbound_tx.clone(); - async move { - let response = match response.await { - Ok(result) => Response { - jsonrpc: JSON_RPC_VERSION, - id, - value: LspResult::Ok(Some(result)), - }, - Err(error) => Response { - jsonrpc: JSON_RPC_VERSION, - id, - value: LspResult::Error(Some(Error { - message: error.to_string(), - })), - }, - }; - if let Some(response) = - serde_json::to_string(&response).log_err() - { - outbound_tx.try_send(response).ok(); - } + let f = f.clone(); + let deserialized_params = cx + .background_executor() + .spawn(async move { serde_json::from_value(params) }); + + cx.spawn({ + let outbound_tx = outbound_tx.clone(); + move |cx| async move { + match deserialized_params.await { + Ok(params) => { + let response = f(params, cx.clone()); + let response = match response.await { + Ok(result) => Response { + jsonrpc: JSON_RPC_VERSION, + id, + value: LspResult::Ok(Some(result)), + }, + Err(error) => Response { + jsonrpc: JSON_RPC_VERSION, + id, + value: LspResult::Error(Some(Error { + message: error.to_string(), + })), + }, + }; + if let Some(response) = + serde_json::to_string(&response).log_err() + { + outbound_tx.try_send(response).ok(); } - }) - .detach(); - } - - Err(error) => { - log::error!("error deserializing {} request: {:?}", method, error); - let response = AnyResponse { - jsonrpc: JSON_RPC_VERSION, - id, - result: None, - error: Some(Error { - message: error.to_string(), - }), - }; - if let Some(response) = serde_json::to_string(&response).log_err() { - outbound_tx.try_send(response).ok(); + } + Err(error) => { + log::error!( + "error deserializing {} request: {:?}", + method, + error + ); + let response = AnyResponse { + jsonrpc: JSON_RPC_VERSION, + id, + result: None, + error: Some(Error { + message: error.to_string(), + }), + }; + if let Some(response) = + serde_json::to_string(&response).log_err() + { + outbound_tx.try_send(response).ok(); + } + } } } - } + }) + .detach(); } }), ); @@ -1425,12 +1445,12 @@ impl FakeLanguageServer { /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type. pub fn handle_request( &self, - mut handler: F, + handler: F, ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, T::Params: 'static + Send, - F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut, + F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext) -> Fut, Fut: 'static + Send + Future>, { let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded(); @@ -1454,12 +1474,12 @@ impl FakeLanguageServer { /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type. pub fn handle_notification( &self, - mut handler: F, + handler: F, ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + notification::Notification, T::Params: 'static + Send, - F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext), + F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext), { let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded(); self.server.remove_notification_handler::();