language: Spawn language servers on background threads (#44631)

Lukas Wirth created

Closes https://github.com/zed-industries/zed/issues/39056

Leverages a new `await_on_background` API that spawns the future on the
background but blocks the current task, allowing to borrow from the
surrounding scope.

Release Notes:

- N/A *or* Added/Fixed/Improved ...

Change summary

Cargo.toml                            |  2 
crates/copilot/src/copilot.rs         |  3 
crates/diagnostics/src/diagnostics.rs | 81 +++++++++++++---------------
crates/gpui/src/executor.rs           | 52 ++++++++++++++++++
crates/language/src/language.rs       | 23 ++++++--
crates/lsp/src/lsp.rs                 | 36 +++++++-----
crates/prettier/src/prettier.rs       |  1 
crates/project/src/lsp_store.rs       |  1 
8 files changed, 131 insertions(+), 68 deletions(-)

Detailed changes

Cargo.toml 🔗

@@ -631,7 +631,7 @@ shellexpand = "2.1.0"
 shlex = "1.3.0"
 simplelog = "0.12.2"
 slotmap = "1.0.6"
-smallvec = { version = "1.6", features = ["union"] }
+smallvec = { version = "1.6", features = ["union", "const_new"] }
 smol = "2.0"
 sqlformat = "0.2"
 stacksafe = "0.1"

crates/copilot/src/copilot.rs 🔗

@@ -516,7 +516,8 @@ impl Copilot {
                 None,
                 Default::default(),
                 cx,
-            )?;
+            )
+            .await?;
 
             server
                 .on_notification::<StatusNotification, _>(|_, _| { /* Silence the notification */ })

crates/diagnostics/src/diagnostics.rs 🔗

@@ -1045,54 +1045,47 @@ async fn heuristic_syntactic_expand(
         let node_range = node_start..node_end;
         let row_count = node_end.row - node_start.row + 1;
         let mut ancestor_range = None;
-        let reached_outline_node = cx.background_executor().scoped({
-            let node_range = node_range.clone();
-            let outline_range = outline_range.clone();
-            let ancestor_range = &mut ancestor_range;
-            |scope| {
-                scope.spawn(async move {
-                    // Stop if we've exceeded the row count or reached an outline node. Then, find the interval
-                    // of node children which contains the query range. For example, this allows just returning
-                    // the header of a declaration rather than the entire declaration.
-                    if row_count > max_row_count || outline_range == Some(node_range.clone()) {
-                        let mut cursor = node.walk();
-                        let mut included_child_start = None;
-                        let mut included_child_end = None;
-                        let mut previous_end = node_start;
-                        if cursor.goto_first_child() {
-                            loop {
-                                let child_node = cursor.node();
-                                let child_range =
-                                    previous_end..Point::from_ts_point(child_node.end_position());
-                                if included_child_start.is_none()
-                                    && child_range.contains(&input_range.start)
-                                {
-                                    included_child_start = Some(child_range.start);
-                                }
-                                if child_range.contains(&input_range.end) {
-                                    included_child_end = Some(child_range.end);
-                                }
-                                previous_end = child_range.end;
-                                if !cursor.goto_next_sibling() {
-                                    break;
-                                }
+        cx.background_executor()
+            .await_on_background(async {
+                // Stop if we've exceeded the row count or reached an outline node. Then, find the interval
+                // of node children which contains the query range. For example, this allows just returning
+                // the header of a declaration rather than the entire declaration.
+                if row_count > max_row_count || outline_range == Some(node_range.clone()) {
+                    let mut cursor = node.walk();
+                    let mut included_child_start = None;
+                    let mut included_child_end = None;
+                    let mut previous_end = node_start;
+                    if cursor.goto_first_child() {
+                        loop {
+                            let child_node = cursor.node();
+                            let child_range =
+                                previous_end..Point::from_ts_point(child_node.end_position());
+                            if included_child_start.is_none()
+                                && child_range.contains(&input_range.start)
+                            {
+                                included_child_start = Some(child_range.start);
                             }
-                        }
-                        let end = included_child_end.unwrap_or(node_range.end);
-                        if let Some(start) = included_child_start {
-                            let row_count = end.row - start.row;
-                            if row_count < max_row_count {
-                                *ancestor_range =
-                                    Some(Some(RangeInclusive::new(start.row, end.row)));
-                                return;
+                            if child_range.contains(&input_range.end) {
+                                included_child_end = Some(child_range.end);
+                            }
+                            previous_end = child_range.end;
+                            if !cursor.goto_next_sibling() {
+                                break;
                             }
                         }
-                        *ancestor_range = Some(None);
                     }
-                })
-            }
-        });
-        reached_outline_node.await;
+                    let end = included_child_end.unwrap_or(node_range.end);
+                    if let Some(start) = included_child_start {
+                        let row_count = end.row - start.row;
+                        if row_count < max_row_count {
+                            ancestor_range = Some(Some(RangeInclusive::new(start.row, end.row)));
+                            return;
+                        }
+                    }
+                    ancestor_range = Some(None);
+                }
+            })
+            .await;
         if let Some(node) = ancestor_range {
             return node;
         }

crates/gpui/src/executor.rs 🔗

@@ -1,6 +1,7 @@
 use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant};
 use async_task::Runnable;
 use futures::channel::mpsc;
+use parking_lot::{Condvar, Mutex};
 use smol::prelude::*;
 use std::{
     fmt::Debug,
@@ -154,6 +155,57 @@ impl BackgroundExecutor {
         self.spawn_internal::<R>(Box::pin(future), None)
     }
 
+    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
+    ///
+    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
+    /// completion before the current task is resumed, even if the current task is slated for cancellation.
+    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
+    where
+        R: Send,
+    {
+        // We need to ensure that cancellation of the parent task does not drop the environment
+        // before the our own task has completed or got cancelled.
+        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
+
+        impl Drop for NotifyOnDrop<'_> {
+            fn drop(&mut self) {
+                *self.0.1.lock() = true;
+                self.0.0.notify_all();
+            }
+        }
+
+        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
+
+        impl Drop for WaitOnDrop<'_> {
+            fn drop(&mut self) {
+                let mut done = self.0.1.lock();
+                if !*done {
+                    self.0.0.wait(&mut done);
+                }
+            }
+        }
+
+        let dispatcher = self.dispatcher.clone();
+        let location = core::panic::Location::caller();
+
+        let pair = &(Condvar::new(), Mutex::new(false));
+        let _wait_guard = WaitOnDrop(pair);
+
+        let (runnable, task) = unsafe {
+            async_task::Builder::new()
+                .metadata(RunnableMeta { location })
+                .spawn_unchecked(
+                    move |_| async {
+                        let _notify_guard = NotifyOnDrop(pair);
+                        future.await
+                    },
+                    move |runnable| dispatcher.dispatch(RunnableVariant::Meta(runnable), None),
+                )
+        };
+        runnable.schedule();
+        task.await
+    }
+
     /// Enqueues the given future to be run to completion on a background thread.
     /// The given label can be used to control the priority of the task in tests.
     #[track_caller]

crates/language/src/language.rs 🔗

@@ -535,7 +535,7 @@ pub trait LspInstaller {
         _version: &Self::BinaryVersion,
         _container_dir: &PathBuf,
         _delegate: &dyn LspAdapterDelegate,
-    ) -> impl Future<Output = Option<LanguageServerBinary>> {
+    ) -> impl Send + Future<Output = Option<LanguageServerBinary>> {
         async { None }
     }
 
@@ -544,7 +544,7 @@ pub trait LspInstaller {
         latest_version: Self::BinaryVersion,
         container_dir: PathBuf,
         delegate: &dyn LspAdapterDelegate,
-    ) -> impl Future<Output = Result<LanguageServerBinary>>;
+    ) -> impl Send + Future<Output = Result<LanguageServerBinary>>;
 
     fn cached_server_binary(
         &self,
@@ -575,6 +575,7 @@ pub trait DynLspInstaller {
 #[async_trait(?Send)]
 impl<LI, BinaryVersion> DynLspInstaller for LI
 where
+    BinaryVersion: Send + Sync,
     LI: LspInstaller<BinaryVersion = BinaryVersion> + LspAdapter,
 {
     async fn try_fetch_server_binary(
@@ -593,8 +594,13 @@ where
             .fetch_latest_server_version(delegate.as_ref(), pre_release, cx)
             .await?;
 
-        if let Some(binary) = self
-            .check_if_version_installed(&latest_version, &container_dir, delegate.as_ref())
+        if let Some(binary) = cx
+            .background_executor()
+            .await_on_background(self.check_if_version_installed(
+                &latest_version,
+                &container_dir,
+                delegate.as_ref(),
+            ))
             .await
         {
             log::debug!("language server {:?} is already installed", name.0);
@@ -603,8 +609,13 @@ where
         } else {
             log::debug!("downloading language server {:?}", name.0);
             delegate.update_status(name.clone(), BinaryStatus::Downloading);
-            let binary = self
-                .fetch_server_binary(latest_version, container_dir, delegate.as_ref())
+            let binary = cx
+                .background_executor()
+                .await_on_background(self.fetch_server_binary(
+                    latest_version,
+                    container_dir,
+                    delegate.as_ref(),
+                ))
                 .await;
 
             delegate.update_status(name.clone(), BinaryStatus::None);

crates/lsp/src/lsp.rs 🔗

@@ -314,7 +314,7 @@ pub struct AdapterServerCapabilities {
 
 impl LanguageServer {
     /// Starts a language server process.
-    pub fn new(
+    pub async fn new(
         stderr_capture: Arc<Mutex<Option<String>>>,
         server_id: LanguageServerId,
         server_name: LanguageServerName,
@@ -331,26 +331,30 @@ impl LanguageServer {
         };
         let root_uri = Uri::from_file_path(&working_dir)
             .map_err(|()| anyhow!("{working_dir:?} is not a valid URI"))?;
-
         log::info!(
-            "starting language server process. binary path: {:?}, working directory: {:?}, args: {:?}",
+            "starting language server process. binary path: \
+            {:?}, working directory: {:?}, args: {:?}",
             binary.path,
             working_dir,
             &binary.arguments
         );
-
-        let mut command = util::command::new_smol_command(&binary.path);
-        command
-            .current_dir(working_dir)
-            .args(&binary.arguments)
-            .envs(binary.env.clone().unwrap_or_default())
-            .stdin(Stdio::piped())
-            .stdout(Stdio::piped())
-            .stderr(Stdio::piped())
-            .kill_on_drop(true);
-        let mut server = command
-            .spawn()
-            .with_context(|| format!("failed to spawn command {command:?}",))?;
+        let mut server = cx
+            .background_executor()
+            .await_on_background(async {
+                let mut command = util::command::new_smol_command(&binary.path);
+                command
+                    .current_dir(working_dir)
+                    .args(&binary.arguments)
+                    .envs(binary.env.clone().unwrap_or_default())
+                    .stdin(Stdio::piped())
+                    .stdout(Stdio::piped())
+                    .stderr(Stdio::piped())
+                    .kill_on_drop(true);
+                command
+                    .spawn()
+                    .with_context(|| format!("failed to spawn command {command:?}",))
+            })
+            .await?;
 
         let stdin = server.stdin.take().unwrap();
         let stdout = server.stdout.take().unwrap();