wasm_host.rs

  1pub mod wit;
  2
  3use crate::capability_granter::CapabilityGranter;
  4use crate::{ExtensionManifest, ExtensionSettings};
  5use anyhow::{Context as _, Result, anyhow, bail};
  6use async_trait::async_trait;
  7use dap::{DebugRequest, StartDebuggingRequestArgumentsRequest};
  8use extension::{
  9    CodeLabel, Command, Completion, ContextServerConfiguration, DebugAdapterBinary,
 10    DebugTaskDefinition, ExtensionCapability, ExtensionHostProxy, KeyValueStoreDelegate,
 11    ProjectDelegate, SlashCommand, SlashCommandArgumentCompletion, SlashCommandOutput, Symbol,
 12    WorktreeDelegate,
 13};
 14use fs::{Fs, normalize_path};
 15use futures::future::LocalBoxFuture;
 16use futures::{
 17    Future, FutureExt, StreamExt as _,
 18    channel::{
 19        mpsc::{self, UnboundedSender},
 20        oneshot,
 21    },
 22    future::BoxFuture,
 23};
 24use gpui::{App, AsyncApp, BackgroundExecutor, Task, Timer};
 25use http_client::HttpClient;
 26use language::LanguageName;
 27use lsp::LanguageServerName;
 28use moka::sync::Cache;
 29use node_runtime::NodeRuntime;
 30use release_channel::ReleaseChannel;
 31use semantic_version::SemanticVersion;
 32use settings::Settings;
 33use std::{
 34    borrow::Cow,
 35    path::{Path, PathBuf},
 36    sync::{
 37        Arc, LazyLock, OnceLock,
 38        atomic::{AtomicBool, Ordering},
 39    },
 40    time::Duration,
 41};
 42use task::{DebugScenario, SpawnInTerminal, TaskTemplate, ZedDebugConfig};
 43use util::paths::SanitizedPath;
 44use wasmtime::{
 45    CacheStore, Engine, Store,
 46    component::{Component, ResourceTable},
 47};
 48use wasmtime_wasi::{self as wasi, WasiView};
 49use wit::Extension;
 50
 51pub struct WasmHost {
 52    engine: Engine,
 53    release_channel: ReleaseChannel,
 54    http_client: Arc<dyn HttpClient>,
 55    node_runtime: NodeRuntime,
 56    pub(crate) proxy: Arc<ExtensionHostProxy>,
 57    fs: Arc<dyn Fs>,
 58    pub work_dir: PathBuf,
 59    /// The capabilities granted to extensions running on the host.
 60    pub(crate) granted_capabilities: Vec<ExtensionCapability>,
 61    _main_thread_message_task: Task<()>,
 62    main_thread_message_tx: mpsc::UnboundedSender<MainThreadCall>,
 63}
 64
 65#[derive(Clone, Debug)]
 66pub struct WasmExtension {
 67    tx: UnboundedSender<ExtensionCall>,
 68    pub manifest: Arc<ExtensionManifest>,
 69    pub work_dir: Arc<Path>,
 70    #[allow(unused)]
 71    pub zed_api_version: SemanticVersion,
 72}
 73
 74impl Drop for WasmExtension {
 75    fn drop(&mut self) {
 76        self.tx.close_channel();
 77    }
 78}
 79
 80#[async_trait]
 81impl extension::Extension for WasmExtension {
 82    fn manifest(&self) -> Arc<ExtensionManifest> {
 83        self.manifest.clone()
 84    }
 85
 86    fn work_dir(&self) -> Arc<Path> {
 87        self.work_dir.clone()
 88    }
 89
 90    async fn language_server_command(
 91        &self,
 92        language_server_id: LanguageServerName,
 93        language_name: LanguageName,
 94        worktree: Arc<dyn WorktreeDelegate>,
 95    ) -> Result<Command> {
 96        self.call(|extension, store| {
 97            async move {
 98                let resource = store.data_mut().table().push(worktree)?;
 99                let command = extension
100                    .call_language_server_command(
101                        store,
102                        &language_server_id,
103                        &language_name,
104                        resource,
105                    )
106                    .await?
107                    .map_err(|err| store.data().extension_error(err))?;
108
109                Ok(command.into())
110            }
111            .boxed()
112        })
113        .await?
114    }
115
116    async fn language_server_initialization_options(
117        &self,
118        language_server_id: LanguageServerName,
119        language_name: LanguageName,
120        worktree: Arc<dyn WorktreeDelegate>,
121    ) -> Result<Option<String>> {
122        self.call(|extension, store| {
123            async move {
124                let resource = store.data_mut().table().push(worktree)?;
125                let options = extension
126                    .call_language_server_initialization_options(
127                        store,
128                        &language_server_id,
129                        &language_name,
130                        resource,
131                    )
132                    .await?
133                    .map_err(|err| store.data().extension_error(err))?;
134                anyhow::Ok(options)
135            }
136            .boxed()
137        })
138        .await?
139    }
140
141    async fn language_server_workspace_configuration(
142        &self,
143        language_server_id: LanguageServerName,
144        worktree: Arc<dyn WorktreeDelegate>,
145    ) -> Result<Option<String>> {
146        self.call(|extension, store| {
147            async move {
148                let resource = store.data_mut().table().push(worktree)?;
149                let options = extension
150                    .call_language_server_workspace_configuration(
151                        store,
152                        &language_server_id,
153                        resource,
154                    )
155                    .await?
156                    .map_err(|err| store.data().extension_error(err))?;
157                anyhow::Ok(options)
158            }
159            .boxed()
160        })
161        .await?
162    }
163
164    async fn language_server_additional_initialization_options(
165        &self,
166        language_server_id: LanguageServerName,
167        target_language_server_id: LanguageServerName,
168        worktree: Arc<dyn WorktreeDelegate>,
169    ) -> Result<Option<String>> {
170        self.call(|extension, store| {
171            async move {
172                let resource = store.data_mut().table().push(worktree)?;
173                let options = extension
174                    .call_language_server_additional_initialization_options(
175                        store,
176                        &language_server_id,
177                        &target_language_server_id,
178                        resource,
179                    )
180                    .await?
181                    .map_err(|err| store.data().extension_error(err))?;
182                anyhow::Ok(options)
183            }
184            .boxed()
185        })
186        .await?
187    }
188
189    async fn language_server_additional_workspace_configuration(
190        &self,
191        language_server_id: LanguageServerName,
192        target_language_server_id: LanguageServerName,
193        worktree: Arc<dyn WorktreeDelegate>,
194    ) -> Result<Option<String>> {
195        self.call(|extension, store| {
196            async move {
197                let resource = store.data_mut().table().push(worktree)?;
198                let options = extension
199                    .call_language_server_additional_workspace_configuration(
200                        store,
201                        &language_server_id,
202                        &target_language_server_id,
203                        resource,
204                    )
205                    .await?
206                    .map_err(|err| store.data().extension_error(err))?;
207                anyhow::Ok(options)
208            }
209            .boxed()
210        })
211        .await?
212    }
213
214    async fn labels_for_completions(
215        &self,
216        language_server_id: LanguageServerName,
217        completions: Vec<Completion>,
218    ) -> Result<Vec<Option<CodeLabel>>> {
219        self.call(|extension, store| {
220            async move {
221                let labels = extension
222                    .call_labels_for_completions(
223                        store,
224                        &language_server_id,
225                        completions.into_iter().map(Into::into).collect(),
226                    )
227                    .await?
228                    .map_err(|err| store.data().extension_error(err))?;
229
230                Ok(labels
231                    .into_iter()
232                    .map(|label| label.map(Into::into))
233                    .collect())
234            }
235            .boxed()
236        })
237        .await?
238    }
239
240    async fn labels_for_symbols(
241        &self,
242        language_server_id: LanguageServerName,
243        symbols: Vec<Symbol>,
244    ) -> Result<Vec<Option<CodeLabel>>> {
245        self.call(|extension, store| {
246            async move {
247                let labels = extension
248                    .call_labels_for_symbols(
249                        store,
250                        &language_server_id,
251                        symbols.into_iter().map(Into::into).collect(),
252                    )
253                    .await?
254                    .map_err(|err| store.data().extension_error(err))?;
255
256                Ok(labels
257                    .into_iter()
258                    .map(|label| label.map(Into::into))
259                    .collect())
260            }
261            .boxed()
262        })
263        .await?
264    }
265
266    async fn complete_slash_command_argument(
267        &self,
268        command: SlashCommand,
269        arguments: Vec<String>,
270    ) -> Result<Vec<SlashCommandArgumentCompletion>> {
271        self.call(|extension, store| {
272            async move {
273                let completions = extension
274                    .call_complete_slash_command_argument(store, &command.into(), &arguments)
275                    .await?
276                    .map_err(|err| store.data().extension_error(err))?;
277
278                Ok(completions.into_iter().map(Into::into).collect())
279            }
280            .boxed()
281        })
282        .await?
283    }
284
285    async fn run_slash_command(
286        &self,
287        command: SlashCommand,
288        arguments: Vec<String>,
289        delegate: Option<Arc<dyn WorktreeDelegate>>,
290    ) -> Result<SlashCommandOutput> {
291        self.call(|extension, store| {
292            async move {
293                let resource = if let Some(delegate) = delegate {
294                    Some(store.data_mut().table().push(delegate)?)
295                } else {
296                    None
297                };
298
299                let output = extension
300                    .call_run_slash_command(store, &command.into(), &arguments, resource)
301                    .await?
302                    .map_err(|err| store.data().extension_error(err))?;
303
304                Ok(output.into())
305            }
306            .boxed()
307        })
308        .await?
309    }
310
311    async fn context_server_command(
312        &self,
313        context_server_id: Arc<str>,
314        project: Arc<dyn ProjectDelegate>,
315    ) -> Result<Command> {
316        self.call(|extension, store| {
317            async move {
318                let project_resource = store.data_mut().table().push(project)?;
319                let command = extension
320                    .call_context_server_command(store, context_server_id.clone(), project_resource)
321                    .await?
322                    .map_err(|err| store.data().extension_error(err))?;
323                anyhow::Ok(command.into())
324            }
325            .boxed()
326        })
327        .await?
328    }
329
330    async fn context_server_configuration(
331        &self,
332        context_server_id: Arc<str>,
333        project: Arc<dyn ProjectDelegate>,
334    ) -> Result<Option<ContextServerConfiguration>> {
335        self.call(|extension, store| {
336            async move {
337                let project_resource = store.data_mut().table().push(project)?;
338                let Some(configuration) = extension
339                    .call_context_server_configuration(
340                        store,
341                        context_server_id.clone(),
342                        project_resource,
343                    )
344                    .await?
345                    .map_err(|err| store.data().extension_error(err))?
346                else {
347                    return Ok(None);
348                };
349
350                Ok(Some(configuration.try_into()?))
351            }
352            .boxed()
353        })
354        .await?
355    }
356
357    async fn suggest_docs_packages(&self, provider: Arc<str>) -> Result<Vec<String>> {
358        self.call(|extension, store| {
359            async move {
360                let packages = extension
361                    .call_suggest_docs_packages(store, provider.as_ref())
362                    .await?
363                    .map_err(|err| store.data().extension_error(err))?;
364
365                Ok(packages)
366            }
367            .boxed()
368        })
369        .await?
370    }
371
372    async fn index_docs(
373        &self,
374        provider: Arc<str>,
375        package_name: Arc<str>,
376        kv_store: Arc<dyn KeyValueStoreDelegate>,
377    ) -> Result<()> {
378        self.call(|extension, store| {
379            async move {
380                let kv_store_resource = store.data_mut().table().push(kv_store)?;
381                extension
382                    .call_index_docs(
383                        store,
384                        provider.as_ref(),
385                        package_name.as_ref(),
386                        kv_store_resource,
387                    )
388                    .await?
389                    .map_err(|err| store.data().extension_error(err))?;
390
391                anyhow::Ok(())
392            }
393            .boxed()
394        })
395        .await?
396    }
397
398    async fn get_dap_binary(
399        &self,
400        dap_name: Arc<str>,
401        config: DebugTaskDefinition,
402        user_installed_path: Option<PathBuf>,
403        worktree: Arc<dyn WorktreeDelegate>,
404    ) -> Result<DebugAdapterBinary> {
405        self.call(|extension, store| {
406            async move {
407                let resource = store.data_mut().table().push(worktree)?;
408                let dap_binary = extension
409                    .call_get_dap_binary(store, dap_name, config, user_installed_path, resource)
410                    .await?
411                    .map_err(|err| store.data().extension_error(err))?;
412                let dap_binary = dap_binary.try_into()?;
413                Ok(dap_binary)
414            }
415            .boxed()
416        })
417        .await?
418    }
419    async fn dap_request_kind(
420        &self,
421        dap_name: Arc<str>,
422        config: serde_json::Value,
423    ) -> Result<StartDebuggingRequestArgumentsRequest> {
424        self.call(|extension, store| {
425            async move {
426                let kind = extension
427                    .call_dap_request_kind(store, dap_name, config)
428                    .await?
429                    .map_err(|err| store.data().extension_error(err))?;
430                Ok(kind.into())
431            }
432            .boxed()
433        })
434        .await?
435    }
436
437    async fn dap_config_to_scenario(&self, config: ZedDebugConfig) -> Result<DebugScenario> {
438        self.call(|extension, store| {
439            async move {
440                let kind = extension
441                    .call_dap_config_to_scenario(store, config)
442                    .await?
443                    .map_err(|err| store.data().extension_error(err))?;
444                Ok(kind)
445            }
446            .boxed()
447        })
448        .await?
449    }
450
451    async fn dap_locator_create_scenario(
452        &self,
453        locator_name: String,
454        build_config_template: TaskTemplate,
455        resolved_label: String,
456        debug_adapter_name: String,
457    ) -> Result<Option<DebugScenario>> {
458        self.call(|extension, store| {
459            async move {
460                extension
461                    .call_dap_locator_create_scenario(
462                        store,
463                        locator_name,
464                        build_config_template,
465                        resolved_label,
466                        debug_adapter_name,
467                    )
468                    .await
469            }
470            .boxed()
471        })
472        .await?
473    }
474    async fn run_dap_locator(
475        &self,
476        locator_name: String,
477        config: SpawnInTerminal,
478    ) -> Result<DebugRequest> {
479        self.call(|extension, store| {
480            async move {
481                extension
482                    .call_run_dap_locator(store, locator_name, config)
483                    .await?
484                    .map_err(|err| store.data().extension_error(err))
485            }
486            .boxed()
487        })
488        .await?
489    }
490}
491
492pub struct WasmState {
493    manifest: Arc<ExtensionManifest>,
494    pub table: ResourceTable,
495    ctx: wasi::WasiCtx,
496    pub host: Arc<WasmHost>,
497    pub(crate) capability_granter: CapabilityGranter,
498}
499
500std::thread_local! {
501    /// Used by the crash handler to ignore panics in extension-related threads.
502    pub static IS_WASM_THREAD: AtomicBool = const { AtomicBool::new(false) };
503}
504
505type MainThreadCall = Box<dyn Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, ()>>;
506
507type ExtensionCall = Box<
508    dyn Send + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, ()>,
509>;
510
511fn wasm_engine(executor: &BackgroundExecutor) -> wasmtime::Engine {
512    static WASM_ENGINE: OnceLock<wasmtime::Engine> = OnceLock::new();
513    WASM_ENGINE
514        .get_or_init(|| {
515            let mut config = wasmtime::Config::new();
516            config.wasm_component_model(true);
517            config.async_support(true);
518            config
519                .enable_incremental_compilation(cache_store())
520                .unwrap();
521            // Async support introduces the issue that extension execution happens during `Future::poll`,
522            // which could block an async thread.
523            // https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#execution-in-poll
524            //
525            // Epoch interruption is a lightweight mechanism to allow the extensions to yield control
526            // back to the executor at regular intervals.
527            config.epoch_interruption(true);
528
529            let engine = wasmtime::Engine::new(&config).unwrap();
530
531            // It might be safer to do this on a non-async thread to make sure it makes progress
532            // regardless of if extensions are blocking.
533            // However, due to our current setup, this isn't a likely occurrence and we'd rather
534            // not have a dedicated thread just for this. If it becomes an issue, we can consider
535            // creating a separate thread for epoch interruption.
536            let engine_ref = engine.weak();
537            executor
538                .spawn(async move {
539                    IS_WASM_THREAD.with(|v| v.store(true, Ordering::Release));
540                    // Somewhat arbitrary interval, as it isn't a guaranteed interval.
541                    // But this is a rough upper bound for how long the extension execution can block on
542                    // `Future::poll`.
543                    const EPOCH_INTERVAL: Duration = Duration::from_millis(100);
544                    let mut timer = Timer::interval(EPOCH_INTERVAL);
545                    while (timer.next().await).is_some() {
546                        // Exit the loop and thread once the engine is dropped.
547                        let Some(engine) = engine_ref.upgrade() else {
548                            break;
549                        };
550                        engine.increment_epoch();
551                    }
552                })
553                .detach();
554
555            engine
556        })
557        .clone()
558}
559
560fn cache_store() -> Arc<IncrementalCompilationCache> {
561    static CACHE_STORE: LazyLock<Arc<IncrementalCompilationCache>> =
562        LazyLock::new(|| Arc::new(IncrementalCompilationCache::new()));
563    CACHE_STORE.clone()
564}
565
566impl WasmHost {
567    pub fn new(
568        fs: Arc<dyn Fs>,
569        http_client: Arc<dyn HttpClient>,
570        node_runtime: NodeRuntime,
571        proxy: Arc<ExtensionHostProxy>,
572        work_dir: PathBuf,
573        cx: &mut App,
574    ) -> Arc<Self> {
575        let (tx, mut rx) = mpsc::unbounded::<MainThreadCall>();
576        let task = cx.spawn(async move |cx| {
577            while let Some(message) = rx.next().await {
578                message(cx).await;
579            }
580        });
581
582        let extension_settings = ExtensionSettings::get_global(cx);
583
584        Arc::new(Self {
585            engine: wasm_engine(cx.background_executor()),
586            fs,
587            work_dir,
588            http_client,
589            node_runtime,
590            proxy,
591            release_channel: ReleaseChannel::global(cx),
592            granted_capabilities: extension_settings.granted_capabilities.clone(),
593            _main_thread_message_task: task,
594            main_thread_message_tx: tx,
595        })
596    }
597
598    pub fn load_extension(
599        self: &Arc<Self>,
600        wasm_bytes: Vec<u8>,
601        manifest: &Arc<ExtensionManifest>,
602        cx: &AsyncApp,
603    ) -> Task<Result<WasmExtension>> {
604        let this = self.clone();
605        let manifest = manifest.clone();
606        let executor = cx.background_executor().clone();
607        let load_extension_task = async move {
608            let zed_api_version = parse_wasm_extension_version(&manifest.id, &wasm_bytes)?;
609
610            let component = Component::from_binary(&this.engine, &wasm_bytes)
611                .context("failed to compile wasm component")?;
612            let mut store = wasmtime::Store::new(
613                &this.engine,
614                WasmState {
615                    ctx: this.build_wasi_ctx(&manifest).await?,
616                    manifest: manifest.clone(),
617                    table: ResourceTable::new(),
618                    host: this.clone(),
619                    capability_granter: CapabilityGranter::new(
620                        this.granted_capabilities.clone(),
621                        manifest.clone(),
622                    ),
623                },
624            );
625            // Store will yield after 1 tick, and get a new deadline of 1 tick after each yield.
626            store.set_epoch_deadline(1);
627            store.epoch_deadline_async_yield_and_update(1);
628
629            let mut extension = Extension::instantiate_async(
630                &executor,
631                &mut store,
632                this.release_channel,
633                zed_api_version,
634                &component,
635            )
636            .await?;
637
638            extension
639                .call_init_extension(&mut store)
640                .await
641                .context("failed to initialize wasm extension")?;
642
643            let (tx, mut rx) = mpsc::unbounded::<ExtensionCall>();
644            let extension_task = async move {
645                while let Some(call) = rx.next().await {
646                    (call)(&mut extension, &mut store).await;
647                }
648            };
649
650            anyhow::Ok((
651                extension_task,
652                WasmExtension {
653                    manifest: manifest.clone(),
654                    work_dir: this.work_dir.join(manifest.id.as_ref()).into(),
655                    tx,
656                    zed_api_version,
657                },
658            ))
659        };
660        cx.spawn(async move |cx| {
661            let (extension_task, extension) = load_extension_task.await?;
662            // we need to run run the task in an extension context as wasmtime_wasi may
663            // call into tokio, accessing its runtime handle
664            gpui_tokio::Tokio::spawn(cx, extension_task)?.detach();
665
666            Ok(extension)
667        })
668    }
669
670    async fn build_wasi_ctx(&self, manifest: &Arc<ExtensionManifest>) -> Result<wasi::WasiCtx> {
671        let extension_work_dir = self.work_dir.join(manifest.id.as_ref());
672        self.fs
673            .create_dir(&extension_work_dir)
674            .await
675            .context("failed to create extension work dir")?;
676
677        let file_perms = wasi::FilePerms::all();
678        let dir_perms = wasi::DirPerms::all();
679        let path = SanitizedPath::new(&extension_work_dir).to_string();
680        #[cfg(target_os = "windows")]
681        let path = path.replace('\\', "/");
682
683        let mut ctx = wasi::WasiCtxBuilder::new();
684        ctx.inherit_stdio()
685            .env("PWD", &path)
686            .env("RUST_BACKTRACE", "full");
687
688        ctx.preopened_dir(&path, ".", dir_perms, file_perms)?;
689        ctx.preopened_dir(&path, &path, dir_perms, file_perms)?;
690
691        Ok(ctx.build())
692    }
693
694    pub fn writeable_path_from_extension(&self, id: &Arc<str>, path: &Path) -> Result<PathBuf> {
695        let extension_work_dir = self.work_dir.join(id.as_ref());
696        let path = normalize_path(&extension_work_dir.join(path));
697        anyhow::ensure!(
698            path.starts_with(&extension_work_dir),
699            "cannot write to path {path:?}",
700        );
701        Ok(path)
702    }
703}
704
705pub fn parse_wasm_extension_version(
706    extension_id: &str,
707    wasm_bytes: &[u8],
708) -> Result<SemanticVersion> {
709    let mut version = None;
710
711    for part in wasmparser::Parser::new(0).parse_all(wasm_bytes) {
712        if let wasmparser::Payload::CustomSection(s) =
713            part.context("error parsing wasm extension")?
714            && s.name() == "zed:api-version"
715        {
716            version = parse_wasm_extension_version_custom_section(s.data());
717            if version.is_none() {
718                bail!(
719                    "extension {} has invalid zed:api-version section: {:?}",
720                    extension_id,
721                    s.data()
722                );
723            }
724        }
725    }
726
727    // The reason we wait until we're done parsing all of the Wasm bytes to return the version
728    // is to work around a panic that can happen inside of Wasmtime when the bytes are invalid.
729    //
730    // By parsing the entirety of the Wasm bytes before we return, we're able to detect this problem
731    // earlier as an `Err` rather than as a panic.
732    version.with_context(|| format!("extension {extension_id} has no zed:api-version section"))
733}
734
735fn parse_wasm_extension_version_custom_section(data: &[u8]) -> Option<SemanticVersion> {
736    if data.len() == 6 {
737        Some(SemanticVersion::new(
738            u16::from_be_bytes([data[0], data[1]]) as _,
739            u16::from_be_bytes([data[2], data[3]]) as _,
740            u16::from_be_bytes([data[4], data[5]]) as _,
741        ))
742    } else {
743        None
744    }
745}
746
747impl WasmExtension {
748    pub async fn load(
749        extension_dir: &Path,
750        manifest: &Arc<ExtensionManifest>,
751        wasm_host: Arc<WasmHost>,
752        cx: &AsyncApp,
753    ) -> Result<Self> {
754        let path = extension_dir.join("extension.wasm");
755
756        let mut wasm_file = wasm_host
757            .fs
758            .open_sync(&path)
759            .await
760            .context("failed to open wasm file")?;
761
762        let mut wasm_bytes = Vec::new();
763        wasm_file
764            .read_to_end(&mut wasm_bytes)
765            .context("failed to read wasm")?;
766
767        wasm_host
768            .load_extension(wasm_bytes, manifest, cx)
769            .await
770            .with_context(|| format!("failed to load wasm extension {}", manifest.id))
771    }
772
773    pub async fn call<T, Fn>(&self, f: Fn) -> Result<T>
774    where
775        T: 'static + Send,
776        Fn: 'static
777            + Send
778            + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, T>,
779    {
780        let (return_tx, return_rx) = oneshot::channel();
781        self.tx
782            .unbounded_send(Box::new(move |extension, store| {
783                async {
784                    let result = f(extension, store).await;
785                    return_tx.send(result).ok();
786                }
787                .boxed()
788            }))
789            .map_err(|_| {
790                anyhow!(
791                    "wasm extension channel should not be closed yet, extension {} (id {})",
792                    self.manifest.name,
793                    self.manifest.id,
794                )
795            })?;
796        return_rx.await.with_context(|| {
797            format!(
798                "wasm extension channel, extension {} (id {})",
799                self.manifest.name, self.manifest.id,
800            )
801        })
802    }
803}
804
805impl WasmState {
806    fn on_main_thread<T, Fn>(&self, f: Fn) -> impl 'static + Future<Output = T>
807    where
808        T: 'static + Send,
809        Fn: 'static + Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, T>,
810    {
811        let (return_tx, return_rx) = oneshot::channel();
812        self.host
813            .main_thread_message_tx
814            .clone()
815            .unbounded_send(Box::new(move |cx| {
816                async {
817                    let result = f(cx).await;
818                    return_tx.send(result).ok();
819                }
820                .boxed_local()
821            }))
822            .unwrap_or_else(|_| {
823                panic!(
824                    "main thread message channel should not be closed yet, extension {} (id {})",
825                    self.manifest.name, self.manifest.id,
826                )
827            });
828        let name = self.manifest.name.clone();
829        let id = self.manifest.id.clone();
830        async move {
831            return_rx.await.unwrap_or_else(|_| {
832                panic!("main thread message channel, extension {name} (id {id})")
833            })
834        }
835    }
836
837    fn work_dir(&self) -> PathBuf {
838        self.host.work_dir.join(self.manifest.id.as_ref())
839    }
840
841    fn extension_error(&self, message: String) -> anyhow::Error {
842        anyhow!(
843            "from extension \"{}\" version {}: {}",
844            self.manifest.name,
845            self.manifest.version,
846            message
847        )
848    }
849}
850
851impl wasi::WasiView for WasmState {
852    fn table(&mut self) -> &mut ResourceTable {
853        &mut self.table
854    }
855
856    fn ctx(&mut self) -> &mut wasi::WasiCtx {
857        &mut self.ctx
858    }
859}
860
861/// Wrapper around a mini-moka bounded cache for storing incremental compilation artifacts.
862/// Since wasm modules have many similar elements, this can save us a lot of work at the
863/// cost of a small memory footprint. However, we don't want this to be unbounded, so we use
864/// a LFU/LRU cache to evict less used cache entries.
865#[derive(Debug)]
866struct IncrementalCompilationCache {
867    cache: Cache<Vec<u8>, Vec<u8>>,
868}
869
870impl IncrementalCompilationCache {
871    fn new() -> Self {
872        let cache = Cache::builder()
873            // Cap this at 32 MB for now. Our extensions turn into roughly 512kb in the cache,
874            // which means we could store 64 completely novel extensions in the cache, but in
875            // practice we will more than that, which is more than enough for our use case.
876            .max_capacity(32 * 1024 * 1024)
877            .weigher(|k: &Vec<u8>, v: &Vec<u8>| (k.len() + v.len()).try_into().unwrap_or(u32::MAX))
878            .build();
879        Self { cache }
880    }
881}
882
883impl CacheStore for IncrementalCompilationCache {
884    fn get(&self, key: &[u8]) -> Option<Cow<'_, [u8]>> {
885        self.cache.get(key).map(|v| v.into())
886    }
887
888    fn insert(&self, key: &[u8], value: Vec<u8>) -> bool {
889        self.cache.insert(key.to_vec(), value);
890        true
891    }
892}