Move Wasi to async, validate timeslicing, using async in traits still WIP

Isaac Clayton created

Change summary

crates/plugin_runtime/src/wasi.rs           |  26 ++-
crates/zed/src/languages.rs                 |  15 +
crates/zed/src/languages/language_plugin.rs | 127 ++++++++++++----------
crates/zed/src/main.rs                      |   6 
plugins/json_language/src/lib.rs            |  97 +++++++++--------
5 files changed, 150 insertions(+), 121 deletions(-)

Detailed changes

crates/plugin_runtime/src/wasi.rs 🔗

@@ -4,7 +4,7 @@ use anyhow::{anyhow, Error};
 use serde::{de::DeserializeOwned, Serialize};
 
 use wasi_common::{dir, file};
-use wasmtime::{Engine, Instance, Linker, Module, Store, TypedFunc};
+use wasmtime::{Config, Engine, Instance, Linker, Module, Store, TypedFunc};
 use wasmtime_wasi::{Dir, WasiCtx, WasiCtxBuilder};
 
 pub struct WasiResource(u32);
@@ -50,8 +50,10 @@ impl Wasi {
             .build()
     }
 
-    pub fn init(plugin: WasiPlugin) -> Result<Self, Error> {
-        let engine = Engine::default();
+    pub async fn init(plugin: WasiPlugin) -> Result<Self, Error> {
+        let mut config = Config::default();
+        config.async_support(true);
+        let engine = Engine::new(&config)?;
         let mut linker = Linker::new(&engine);
 
         linker.func_wrap("env", "__hello", |x: u32| x * 2).unwrap();
@@ -62,8 +64,8 @@ impl Wasi {
         let mut store: Store<_> = Store::new(&engine, plugin.wasi_ctx);
         let module = Module::new(&engine, plugin.module)?;
 
-        linker.module(&mut store, "", &module)?;
-        let instance = linker.instantiate(&mut store, &module)?;
+        linker.module_async(&mut store, "", &module).await?;
+        let instance = linker.instantiate_async(&mut store, &module).await?;
 
         let alloc_buffer = instance.get_typed_func(&mut store, "__alloc_buffer")?;
         // let free_buffer = instance.get_typed_func(&mut store, "__free_buffer")?;
@@ -166,13 +168,16 @@ impl Wasi {
 
     /// Takes an item, allocates a buffer, serializes the argument to that buffer,
     /// and returns a (ptr, len) pair to that buffer.
-    fn serialize_to_buffer<T: Serialize>(&mut self, item: T) -> Result<(u32, u32), Error> {
+    async fn serialize_to_buffer<T: Serialize>(&mut self, item: T) -> Result<(u32, u32), Error> {
         // serialize the argument using bincode
         let item = bincode::serialize(&item)?;
         let buffer_len = item.len() as u32;
 
         // allocate a buffer and write the argument to that buffer
-        let buffer_ptr = self.alloc_buffer.call(&mut self.store, buffer_len)?;
+        let buffer_ptr = self
+            .alloc_buffer
+            .call_async(&mut self.store, buffer_len)
+            .await?;
         let plugin_memory = self
             .instance
             .get_memory(&mut self.store, "memory")
@@ -212,18 +217,17 @@ impl Wasi {
     }
 
     // TODO: dont' use as for conversions
-    pub fn call<A: Serialize, R: DeserializeOwned>(
+    pub async fn call<A: Serialize, R: DeserializeOwned>(
         &mut self,
         handle: &str,
         arg: A,
     ) -> Result<R, Error> {
-        let start = std::time::Instant::now();
         dbg!(&handle);
         // dbg!(serde_json::to_string(&arg)).unwrap();
 
         // write the argument to linear memory
         // this returns a (ptr, lentgh) pair
-        let arg_buffer = self.serialize_to_buffer(arg)?;
+        let arg_buffer = self.serialize_to_buffer(arg).await?;
 
         // get the webassembly function we want to actually call
         // TODO: precompute handle
@@ -234,7 +238,7 @@ impl Wasi {
 
         // call the function, passing in the buffer and its length
         // this returns a ptr to a (ptr, lentgh) pair
-        let result_buffer = fun.call(&mut self.store, arg_buffer)?;
+        let result_buffer = fun.call_async(&mut self.store, arg_buffer).await?;
 
         self.deserialize_from_buffer(result_buffer)
     }

crates/zed/src/languages.rs 🔗

@@ -1,7 +1,11 @@
-use gpui::Task;
+use gpui::{
+    executor::{self, Background},
+    Task,
+};
 pub use language::*;
 use rust_embed::RustEmbed;
 use std::{borrow::Cow, str, sync::Arc};
+use util::ResultExt;
 
 mod c;
 mod go;
@@ -17,8 +21,7 @@ mod typescript;
 #[exclude = "*.rs"]
 struct LanguageDir;
 
-pub fn build_language_registry(login_shell_env_loaded: Task<()>) -> LanguageRegistry {
-    let languages = LanguageRegistry::new(login_shell_env_loaded);
+pub async fn init(languages: Arc<LanguageRegistry>, executor: Arc<Background>) {
     for (name, grammar, lsp_adapter) in [
         (
             "c",
@@ -38,7 +41,10 @@ pub fn build_language_registry(login_shell_env_loaded: Task<()>) -> LanguageRegi
         (
             "json",
             tree_sitter_json::language(),
-            Some(Arc::new(language_plugin::new_json())),
+            language_plugin::new_json(executor)
+                .await
+                .log_err()
+                .map(|lang| Arc::new(lang) as Arc<_>),
         ),
         (
             "markdown",
@@ -78,7 +84,6 @@ pub fn build_language_registry(login_shell_env_loaded: Task<()>) -> LanguageRegi
     ] {
         languages.add(Arc::new(language(name, grammar, lsp_adapter)));
     }
-    languages
 }
 
 pub(crate) fn language(

crates/zed/src/languages/language_plugin.rs 🔗

@@ -2,6 +2,7 @@ use super::installation::{npm_install_packages, npm_package_latest_version};
 use anyhow::{anyhow, Context, Result};
 use client::http::HttpClient;
 use futures::{future::BoxFuture, FutureExt, StreamExt};
+use gpui::executor::{self, Background};
 use isahc::http::version;
 use language::{LanguageServerName, LspAdapter};
 use parking_lot::{Mutex, RwLock};
@@ -11,23 +12,25 @@ use std::fs;
 use std::{any::Any, path::PathBuf, sync::Arc};
 use util::{ResultExt, TryFutureExt};
 
-pub fn new_json() -> LanguagePluginLspAdapter {
+pub async fn new_json(executor: Arc<Background>) -> Result<PluginLspAdapter> {
     let plugin = WasiPlugin {
         module: include_bytes!("../../../../plugins/bin/json_language.wasm").to_vec(),
         wasi_ctx: Wasi::default_ctx(),
     };
-    LanguagePluginLspAdapter::new(plugin)
+    PluginLspAdapter::new(plugin, executor).await
 }
 
-pub struct LanguagePluginLspAdapter {
-    runtime: Mutex<Wasi>,
+pub struct PluginLspAdapter {
+    runtime: Arc<Mutex<Wasi>>,
+    executor: Arc<Background>,
 }
 
-impl LanguagePluginLspAdapter {
-    pub fn new(plugin: WasiPlugin) -> Self {
-        Self {
-            runtime: Mutex::new(Wasi::init(plugin).unwrap()),
-        }
+impl PluginLspAdapter {
+    pub async fn new(plugin: WasiPlugin, executor: Arc<Background>) -> Result<Self> {
+        Ok(Self {
+            runtime: Arc::new(Mutex::new(Wasi::init(plugin).await?)),
+            executor,
+        })
     }
 }
 
@@ -36,32 +39,43 @@ struct Versions {
     server_version: String,
 }
 
-impl LspAdapter for LanguagePluginLspAdapter {
+macro_rules! call_block {
+    ($self:ident, $name:expr, $arg:expr) => {
+        $self
+            .executor
+            .block(async { $self.runtime.lock().call($name, $arg).await })
+    };
+}
+
+impl LspAdapter for PluginLspAdapter {
     fn name(&self) -> LanguageServerName {
-        let name: String = self.runtime.lock().call("name", ()).unwrap();
+        let name: String = call_block!(self, "name", ()).unwrap();
         LanguageServerName(name.into())
     }
 
     fn server_args<'a>(&'a self) -> Vec<String> {
-        self.runtime.lock().call("server_args", ()).unwrap()
+        call_block!(self, "server_args", ()).unwrap()
     }
 
     fn fetch_latest_server_version(
         &self,
         _: Arc<dyn HttpClient>,
     ) -> BoxFuture<'static, Result<Box<dyn 'static + Send + Any>>> {
-        let versions: Result<(String, String)> =
-            self.runtime.lock().call("fetch_latest_server_version", ());
-
-        async move {
-            versions.map(|(language_version, server_version)| {
-                Box::new(Versions {
-                    language_version,
-                    server_version,
-                }) as Box<_>
-            })
-        }
-        .boxed()
+        todo!()
+        // async move {
+        //     let versions: Result<String, String> = self
+        //         .runtime
+        //         .lock()
+        //         .call::<_, Option<String>>("fetch_latest_server_version", ())
+        //         .await?;
+        //     versions.map(|(language_version, server_version)| {
+        //         Box::new(Versions {
+        //             language_version,
+        //             server_version,
+        //         }) as Box<_>
+        //     })
+        // }
+        // .boxed()
     }
 
     fn fetch_server_binary(
@@ -70,34 +84,37 @@ impl LspAdapter for LanguagePluginLspAdapter {
         _: Arc<dyn HttpClient>,
         container_dir: PathBuf,
     ) -> BoxFuture<'static, Result<PathBuf>> {
-        let version = version.downcast::<String>().unwrap();
-        let mut runtime = self.runtime.lock();
-
-        let result: Result<PathBuf, _> = (|| {
-            let handle = runtime.attach_path(&container_dir)?;
-            let result = runtime
-                .call::<_, Option<PathBuf>>("fetch_server_binary", container_dir)?
-                .ok_or_else(|| anyhow!("Could not load cached server binary"));
-            runtime.remove_resource(handle)?;
-            result
-        })();
-
-        async move { result }.boxed()
+        todo!()
+        // let version = version.downcast::<String>().unwrap();
+
+        // async move {
+        //     let runtime = self.runtime.clone();
+        //     let handle = runtime.lock().attach_path(&container_dir).unwrap();
+        //     let result = runtime
+        //         .lock()
+        //         .call::<_, Option<PathBuf>>("fetch_server_binary", container_dir)
+        //         .await
+        //         .unwrap()
+        //         .ok_or_else(|| anyhow!("Could not load cached server binary"));
+        //     // runtime.remove_resource(handle).ok();
+        //     result
+        // }
+        // .boxed()
     }
 
     fn cached_server_binary(&self, container_dir: PathBuf) -> BoxFuture<'static, Option<PathBuf>> {
-        let mut runtime = self.runtime.lock();
-
-        let result: Option<PathBuf> = (|| {
-            let handle = runtime.attach_path(&container_dir).ok()?;
-            let result = runtime
-                .call::<_, Option<PathBuf>>("cached_server_binary", container_dir)
-                .ok()?;
-            runtime.remove_resource(handle).ok()?;
-            result
-        })();
-
-        async move { result }.boxed()
+        todo!()
+        // let runtime = self.runtime.clone();
+        // async move {
+        //     let handle = runtime.lock().attach_path(&container_dir).ok()?;
+        //     let result = runtime
+        //         .lock()
+        //         .call::<_, Option<PathBuf>>("cached_server_binary", container_dir);
+        //     let result = result.await;
+        //     runtime.lock().remove_resource(handle).ok()?;
+        //     result.ok()?
+        // }
+        // .boxed()
     }
 
     fn process_diagnostics(&self, _: &mut lsp::PublishDiagnosticsParams) {}
@@ -111,11 +128,7 @@ impl LspAdapter for LanguagePluginLspAdapter {
         let len = item.label.len();
         let grammar = language.grammar()?;
         let kind = format!("{:?}", item.kind?);
-        let name: String = self
-            .runtime
-            .lock()
-            .call("label_for_completion", kind)
-            .ok()?;
+        let name: String = call_block!(self, "label_for_completion", kind).log_err()?;
         let highlight_id = grammar.highlight_id_for_name(&name)?;
         Some(language::CodeLabel {
             text: item.label.clone(),
@@ -125,11 +138,7 @@ impl LspAdapter for LanguagePluginLspAdapter {
     }
 
     fn initialization_options(&self) -> Option<serde_json::Value> {
-        let string = self
-            .runtime
-            .lock()
-            .call::<_, Option<String>>("initialization_options", ())
-            .unwrap()?;
+        let string: String = call_block!(self, "initialization_options", ()).log_err()?;
 
         serde_json::from_str(&string).ok()
     }

crates/zed/src/main.rs 🔗

@@ -21,6 +21,7 @@ use futures::{
 };
 use gpui::{executor::Background, App, AssetSource, AsyncAppContext, Task};
 use isahc::{config::Configurable, AsyncBody, Request};
+use language::LanguageRegistry;
 use log::LevelFilter;
 use parking_lot::Mutex;
 use project::{Fs, ProjectStore};
@@ -163,7 +164,7 @@ fn main() {
 
     app.run(move |cx| {
         let client = client::Client::new(http.clone());
-        let mut languages = languages::build_language_registry(login_shell_env_loaded);
+        let mut languages = LanguageRegistry::new(login_shell_env_loaded);
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http.clone(), cx));
 
         context_menu::init(cx);
@@ -210,6 +211,9 @@ fn main() {
 
         languages.set_language_server_download_dir(zed::ROOT_PATH.clone());
         let languages = Arc::new(languages);
+        cx.background()
+            .spawn(languages::init(languages.clone(), cx.background().clone()))
+            .detach();
 
         cx.observe_global::<Settings, _>({
             let languages = languages.clone();

plugins/json_language/src/lib.rs 🔗

@@ -17,6 +17,13 @@ extern "C" {
 
 // }
 
+// #[no_mangle]
+// pub extern "C" fn very_unique_name_of_course() -> impl std::future::Future<Output = u32> {
+//     async move {
+//         std::fs::read_to_string("heck.txt").unwrap().len() as u32
+//     }
+// }
+
 const BIN_PATH: &'static str =
     "node_modules/vscode-json-languageserver/bin/vscode-json-languageserver";
 
@@ -34,52 +41,52 @@ pub fn server_args() -> Vec<String> {
     vec!["--stdio".into()]
 }
 
-#[bind]
-pub fn fetch_latest_server_version() -> Option<String> {
-    #[derive(Deserialize)]
-    struct NpmInfo {
-        versions: Vec<String>,
-    }
-
-    let output = command("npm info vscode-json-languageserver --json")?;
-    if !output.status.success() {
-        return None;
-    }
-
-    let mut info: NpmInfo = serde_json::from_slice(&output.stdout)?;
-    info.versions.pop()
-}
-
-#[bind]
-pub fn fetch_server_binary(container_dir: PathBuf, version: String) -> Result<PathBuf, String> {
-    let version_dir = container_dir.join(version.as_str());
-    fs::create_dir_all(&version_dir)
-        .or_or_else(|| "failed to create version directory".to_string())?;
-    let binary_path = version_dir.join(Self::BIN_PATH);
-
-    if fs::metadata(&binary_path).await.is_err() {
-        let output = command(format!(
-            "npm install vscode-json-languageserver@{}",
-            version
-        ));
-        if !output.status.success() {
-            Err(anyhow!("failed to install vscode-json-languageserver"))?;
-        }
-
-        if let Some(mut entries) = fs::read_dir(&container_dir).await.log_err() {
-            while let Some(entry) = entries.next().await {
-                if let Some(entry) = entry.log_err() {
-                    let entry_path = entry.path();
-                    if entry_path.as_path() != version_dir {
-                        fs::remove_dir_all(&entry_path).await.log_err();
-                    }
-                }
-            }
-        }
-    }
+// #[bind]
+// pub fn fetch_latest_server_version() -> Option<String> {
+//     #[derive(Deserialize)]
+//     struct NpmInfo {
+//         versions: Vec<String>,
+//     }
+
+//     let output = command("npm info vscode-json-languageserver --json")?;
+//     if !output.status.success() {
+//         return None;
+//     }
+
+//     let mut info: NpmInfo = serde_json::from_slice(&output.stdout)?;
+//     info.versions.pop()
+// }
 
-    Ok(binary_path)
-}
+// #[bind]
+// pub fn fetch_server_binary(container_dir: PathBuf, version: String) -> Result<PathBuf, String> {
+//     let version_dir = container_dir.join(version.as_str());
+//     fs::create_dir_all(&version_dir)
+//         .or_or_else(|| "failed to create version directory".to_string())?;
+//     let binary_path = version_dir.join(Self::BIN_PATH);
+
+//     if fs::metadata(&binary_path).await.is_err() {
+//         let output = command(format!(
+//             "npm install vscode-json-languageserver@{}",
+//             version
+//         ));
+//         if !output.status.success() {
+//             Err(anyhow!("failed to install vscode-json-languageserver"))?;
+//         }
+
+//         if let Some(mut entries) = fs::read_dir(&container_dir).await.log_err() {
+//             while let Some(entry) = entries.next().await {
+//                 if let Some(entry) = entry.log_err() {
+//                     let entry_path = entry.path();
+//                     if entry_path.as_path() != version_dir {
+//                         fs::remove_dir_all(&entry_path).await.log_err();
+//                     }
+//                 }
+//             }
+//         }
+//     }
+
+//     Ok(binary_path)
+// }
 
 #[bind]
 pub fn cached_server_binary(container_dir: PathBuf) -> Option<PathBuf> {