Add epoch interruption to WASM engine for cooperative yielding (#32806)

Ben Brandt created

Prevent extensions from blocking async threads by enabling epoch
interruption with 100ms intervals. Extensions will yield control back to
the executor regularly during Future::poll operations.

Addresses the
[discussion](https://github.com/zed-industries/zed/discussions/24515)
that goes into depth on why this is important when enabling async
support with Wasmtime.

Release Notes:

- N/A

Change summary

crates/extension_host/src/wasm_host.rs                  | 69 ++++++++--
crates/extension_host/src/wasm_host/wit.rs              | 23 ++-
crates/extension_host/src/wasm_host/wit/since_v0_0_1.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_0_4.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_0_6.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_1_0.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_2_0.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_3_0.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_4_0.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_5_0.rs |  5 
crates/extension_host/src/wasm_host/wit/since_v0_6_0.rs |  6 
11 files changed, 94 insertions(+), 44 deletions(-)

Detailed changes

crates/extension_host/src/wasm_host.rs 🔗

@@ -19,7 +19,7 @@ use futures::{
     },
     future::BoxFuture,
 };
-use gpui::{App, AsyncApp, BackgroundExecutor, Task};
+use gpui::{App, AsyncApp, BackgroundExecutor, Task, Timer};
 use http_client::HttpClient;
 use language::LanguageName;
 use lsp::LanguageServerName;
@@ -28,7 +28,8 @@ use node_runtime::NodeRuntime;
 use release_channel::ReleaseChannel;
 use semantic_version::SemanticVersion;
 use std::borrow::Cow;
-use std::sync::LazyLock;
+use std::sync::{LazyLock, OnceLock};
+use std::time::Duration;
 use std::{
     path::{Path, PathBuf},
     sync::Arc,
@@ -487,18 +488,52 @@ type ExtensionCall = Box<
     dyn Send + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, ()>,
 >;
 
-fn wasm_engine() -> wasmtime::Engine {
-    static WASM_ENGINE: LazyLock<wasmtime::Engine> = LazyLock::new(|| {
-        let mut config = wasmtime::Config::new();
-        config.wasm_component_model(true);
-        config.async_support(true);
-        config
-            .enable_incremental_compilation(cache_store())
-            .unwrap();
-        wasmtime::Engine::new(&config).unwrap()
-    });
-
-    WASM_ENGINE.clone()
+fn wasm_engine(executor: &BackgroundExecutor) -> wasmtime::Engine {
+    static WASM_ENGINE: OnceLock<wasmtime::Engine> = OnceLock::new();
+    WASM_ENGINE
+        .get_or_init(|| {
+            let mut config = wasmtime::Config::new();
+            config.wasm_component_model(true);
+            config.async_support(true);
+            config
+                .enable_incremental_compilation(cache_store())
+                .unwrap();
+            // Async support introduces the issue that extension execution happens during `Future::poll`,
+            // which could block an async thread.
+            // https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#execution-in-poll
+            //
+            // Epoch interruption is a lightweight mechanism to allow the extensions to yield control
+            // back to the executor at regular intervals.
+            config.epoch_interruption(true);
+
+            let engine = wasmtime::Engine::new(&config).unwrap();
+
+            // It might be safer to do this on a non-async thread to make sure it makes progress
+            // regardless of if extensions are blocking.
+            // However, due to our current setup, this isn't a likely occurrence and we'd rather
+            // not have a dedicated thread just for this. If it becomes an issue, we can consider
+            // creating a separate thread for epoch interruption.
+            let engine_ref = engine.weak();
+            executor
+                .spawn(async move {
+                    // Somewhat arbitrary interval, as it isn't a guaranteed interval.
+                    // But this is a rough upper bound for how long the extension execution can block on
+                    // `Future::poll`.
+                    const EPOCH_INTERVAL: Duration = Duration::from_millis(100);
+                    let mut timer = Timer::interval(EPOCH_INTERVAL);
+                    while let Some(_) = timer.next().await {
+                        // Exit the loop and thread once the engine is dropped.
+                        let Some(engine) = engine_ref.upgrade() else {
+                            break;
+                        };
+                        engine.increment_epoch();
+                    }
+                })
+                .detach();
+
+            engine
+        })
+        .clone()
 }
 
 fn cache_store() -> Arc<IncrementalCompilationCache> {
@@ -523,7 +558,7 @@ impl WasmHost {
             }
         });
         Arc::new(Self {
-            engine: wasm_engine(),
+            engine: wasm_engine(cx.background_executor()),
             fs,
             work_dir,
             http_client,
@@ -558,8 +593,12 @@ impl WasmHost {
                     host: this.clone(),
                 },
             );
+            // Store will yield after 1 tick, and get a new deadline of 1 tick after each yield.
+            store.set_epoch_deadline(1);
+            store.epoch_deadline_async_yield_and_update(1);
 
             let mut extension = Extension::instantiate_async(
+                &executor,
                 &mut store,
                 this.release_channel,
                 zed_api_version,

crates/extension_host/src/wasm_host/wit.rs 🔗

@@ -9,6 +9,7 @@ mod since_v0_5_0;
 mod since_v0_6_0;
 use dap::DebugRequest;
 use extension::{DebugTaskDefinition, KeyValueStoreDelegate, WorktreeDelegate};
+use gpui::BackgroundExecutor;
 use language::LanguageName;
 use lsp::LanguageServerName;
 use release_channel::ReleaseChannel;
@@ -39,9 +40,10 @@ pub use latest::{
 pub use since_v0_0_4::LanguageServerConfig;
 
 pub fn new_linker(
+    executor: &BackgroundExecutor,
     f: impl Fn(&mut Linker<WasmState>, fn(&mut WasmState) -> &mut WasmState) -> Result<()>,
 ) -> Linker<WasmState> {
-    let mut linker = Linker::new(&wasm_engine());
+    let mut linker = Linker::new(&wasm_engine(executor));
     wasmtime_wasi::add_to_linker_async(&mut linker).unwrap();
     f(&mut linker, wasi_view).unwrap();
     linker
@@ -109,6 +111,7 @@ pub enum Extension {
 
 impl Extension {
     pub async fn instantiate_async(
+        executor: &BackgroundExecutor,
         store: &mut Store<WasmState>,
         release_channel: ReleaseChannel,
         version: SemanticVersion,
@@ -121,7 +124,7 @@ impl Extension {
             authorize_access_to_unreleased_wasm_api_version(release_channel)?;
 
             let extension =
-                latest::Extension::instantiate_async(store, component, latest::linker())
+                latest::Extension::instantiate_async(store, component, latest::linker(executor))
                     .await
                     .context("failed to instantiate wasm extension")?;
             Ok(Self::V0_6_0(extension))
@@ -129,7 +132,7 @@ impl Extension {
             let extension = since_v0_5_0::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_5_0::linker(),
+                since_v0_5_0::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -138,7 +141,7 @@ impl Extension {
             let extension = since_v0_4_0::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_4_0::linker(),
+                since_v0_4_0::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -147,7 +150,7 @@ impl Extension {
             let extension = since_v0_3_0::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_3_0::linker(),
+                since_v0_3_0::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -156,7 +159,7 @@ impl Extension {
             let extension = since_v0_2_0::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_2_0::linker(),
+                since_v0_2_0::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -165,7 +168,7 @@ impl Extension {
             let extension = since_v0_1_0::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_1_0::linker(),
+                since_v0_1_0::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -174,7 +177,7 @@ impl Extension {
             let extension = since_v0_0_6::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_0_6::linker(),
+                since_v0_0_6::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -183,7 +186,7 @@ impl Extension {
             let extension = since_v0_0_4::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_0_4::linker(),
+                since_v0_0_4::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;
@@ -192,7 +195,7 @@ impl Extension {
             let extension = since_v0_0_1::Extension::instantiate_async(
                 store,
                 component,
-                since_v0_0_1::linker(),
+                since_v0_0_1::linker(executor),
             )
             .await
             .context("failed to instantiate wasm extension")?;

crates/extension_host/src/wasm_host/wit/since_v0_0_1.rs 🔗

@@ -3,6 +3,7 @@ use crate::wasm_host::WasmState;
 use crate::wasm_host::wit::since_v0_0_4;
 use anyhow::Result;
 use extension::{ExtensionLanguageServerProxy, WorktreeDelegate};
+use gpui::BackgroundExecutor;
 use language::BinaryStatus;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
@@ -23,9 +24,9 @@ wasmtime::component::bindgen!({
 
 pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<DownloadedFileType> for latest::DownloadedFileType {

crates/extension_host/src/wasm_host/wit/since_v0_0_4.rs 🔗

@@ -2,6 +2,7 @@ use super::latest;
 use crate::wasm_host::WasmState;
 use anyhow::Result;
 use extension::WorktreeDelegate;
+use gpui::BackgroundExecutor;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
 use wasmtime::component::{Linker, Resource};
@@ -21,9 +22,9 @@ wasmtime::component::bindgen!({
 
 pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<DownloadedFileType> for latest::DownloadedFileType {

crates/extension_host/src/wasm_host/wit/since_v0_0_6.rs 🔗

@@ -2,6 +2,7 @@ use super::{latest, since_v0_1_0};
 use crate::wasm_host::WasmState;
 use anyhow::Result;
 use extension::WorktreeDelegate;
+use gpui::BackgroundExecutor;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
 use wasmtime::component::{Linker, Resource};
@@ -27,9 +28,9 @@ mod settings {
 
 pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<Command> for latest::Command {

crates/extension_host/src/wasm_host/wit/since_v0_1_0.rs 🔗

@@ -7,6 +7,7 @@ use async_tar::Archive;
 use extension::{ExtensionLanguageServerProxy, KeyValueStoreDelegate, WorktreeDelegate};
 use futures::{AsyncReadExt, lock::Mutex};
 use futures::{FutureExt as _, io::BufReader};
+use gpui::BackgroundExecutor;
 use language::LanguageName;
 use language::{BinaryStatus, language_settings::AllLanguageSettings};
 use project::project_settings::ProjectSettings;
@@ -48,9 +49,9 @@ pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 pub type ExtensionKeyValueStore = Arc<dyn KeyValueStoreDelegate>;
 pub type ExtensionHttpResponseStream = Arc<Mutex<::http_client::Response<AsyncBody>>>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<Command> for latest::Command {

crates/extension_host/src/wasm_host/wit/since_v0_2_0.rs 🔗

@@ -1,6 +1,7 @@
 use crate::wasm_host::WasmState;
 use anyhow::Result;
 use extension::{KeyValueStoreDelegate, ProjectDelegate, WorktreeDelegate};
+use gpui::BackgroundExecutor;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
 use wasmtime::component::{Linker, Resource};
@@ -36,9 +37,9 @@ pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 pub type ExtensionProject = Arc<dyn ProjectDelegate>;
 pub type ExtensionKeyValueStore = Arc<dyn KeyValueStoreDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<Command> for latest::Command {

crates/extension_host/src/wasm_host/wit/since_v0_3_0.rs 🔗

@@ -1,6 +1,7 @@
 use crate::wasm_host::WasmState;
 use anyhow::Result;
 use extension::{KeyValueStoreDelegate, ProjectDelegate, WorktreeDelegate};
+use gpui::BackgroundExecutor;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
 use wasmtime::component::{Linker, Resource};
@@ -36,9 +37,9 @@ pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 pub type ExtensionProject = Arc<dyn ProjectDelegate>;
 pub type ExtensionKeyValueStore = Arc<dyn KeyValueStoreDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<CodeLabel> for latest::CodeLabel {

crates/extension_host/src/wasm_host/wit/since_v0_4_0.rs 🔗

@@ -1,6 +1,7 @@
 use crate::wasm_host::WasmState;
 use anyhow::Result;
 use extension::{KeyValueStoreDelegate, ProjectDelegate, WorktreeDelegate};
+use gpui::BackgroundExecutor;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
 use wasmtime::component::{Linker, Resource};
@@ -36,9 +37,9 @@ pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 pub type ExtensionProject = Arc<dyn ProjectDelegate>;
 pub type ExtensionKeyValueStore = Arc<dyn KeyValueStoreDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<CodeLabel> for latest::CodeLabel {

crates/extension_host/src/wasm_host/wit/since_v0_5_0.rs 🔗

@@ -1,6 +1,7 @@
 use crate::wasm_host::WasmState;
 use anyhow::Result;
 use extension::{KeyValueStoreDelegate, ProjectDelegate, WorktreeDelegate};
+use gpui::BackgroundExecutor;
 use semantic_version::SemanticVersion;
 use std::sync::{Arc, OnceLock};
 use wasmtime::component::{Linker, Resource};
@@ -38,9 +39,9 @@ pub type ExtensionWorktree = Arc<dyn WorktreeDelegate>;
 pub type ExtensionProject = Arc<dyn ProjectDelegate>;
 pub type ExtensionKeyValueStore = Arc<dyn KeyValueStoreDelegate>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<CodeLabel> for latest::CodeLabel {

crates/extension_host/src/wasm_host/wit/since_v0_6_0.rs 🔗

@@ -18,7 +18,7 @@ use extension::{
 };
 use futures::{AsyncReadExt, lock::Mutex};
 use futures::{FutureExt as _, io::BufReader};
-use gpui::SharedString;
+use gpui::{BackgroundExecutor, SharedString};
 use language::{BinaryStatus, LanguageName, language_settings::AllLanguageSettings};
 use project::project_settings::ProjectSettings;
 use semantic_version::SemanticVersion;
@@ -59,9 +59,9 @@ pub type ExtensionProject = Arc<dyn ProjectDelegate>;
 pub type ExtensionKeyValueStore = Arc<dyn KeyValueStoreDelegate>;
 pub type ExtensionHttpResponseStream = Arc<Mutex<::http_client::Response<AsyncBody>>>;
 
-pub fn linker() -> &'static Linker<WasmState> {
+pub fn linker(executor: &BackgroundExecutor) -> &'static Linker<WasmState> {
     static LINKER: OnceLock<Linker<WasmState>> = OnceLock::new();
-    LINKER.get_or_init(|| super::new_linker(Extension::add_to_linker))
+    LINKER.get_or_init(|| super::new_linker(executor, Extension::add_to_linker))
 }
 
 impl From<Range> for std::ops::Range<usize> {