wasm_host.rs

  1pub mod llm_provider;
  2pub mod wit;
  3
  4use crate::capability_granter::CapabilityGranter;
  5use crate::{ExtensionManifest, ExtensionSettings};
  6use anyhow::{Context as _, Result, anyhow, bail};
  7use async_trait::async_trait;
  8use collections::HashSet;
  9use dap::{DebugRequest, StartDebuggingRequestArgumentsRequest};
 10use extension::{
 11    CodeLabel, Command, Completion, ContextServerConfiguration, DebugAdapterBinary,
 12    DebugTaskDefinition, ExtensionCapability, ExtensionHostProxy, KeyValueStoreDelegate,
 13    ProjectDelegate, SlashCommand, SlashCommandArgumentCompletion, SlashCommandOutput, Symbol,
 14    WorktreeDelegate,
 15};
 16use fs::{Fs, normalize_path};
 17use futures::future::LocalBoxFuture;
 18use futures::{
 19    Future, FutureExt, StreamExt as _,
 20    channel::{
 21        mpsc::{self, UnboundedSender},
 22        oneshot,
 23    },
 24    future::BoxFuture,
 25};
 26use gpui::{App, AsyncApp, BackgroundExecutor, Task, Timer};
 27use http_client::HttpClient;
 28use language::LanguageName;
 29use lsp::LanguageServerName;
 30use moka::sync::Cache;
 31use node_runtime::NodeRuntime;
 32use release_channel::ReleaseChannel;
 33use semver::Version;
 34use settings::Settings;
 35use std::{
 36    borrow::Cow,
 37    path::{Path, PathBuf},
 38    sync::{
 39        Arc, LazyLock, OnceLock,
 40        atomic::{AtomicBool, Ordering},
 41    },
 42    time::Duration,
 43};
 44use task::{DebugScenario, SpawnInTerminal, TaskTemplate, ZedDebugConfig};
 45use util::paths::SanitizedPath;
 46use wasmtime::{
 47    CacheStore, Engine, Store,
 48    component::{Component, ResourceTable},
 49};
 50use wasmtime_wasi::{self as wasi, WasiView};
 51use wit::Extension;
 52
 53pub struct WasmHost {
 54    engine: Engine,
 55    release_channel: ReleaseChannel,
 56    http_client: Arc<dyn HttpClient>,
 57    node_runtime: NodeRuntime,
 58    pub(crate) proxy: Arc<ExtensionHostProxy>,
 59    fs: Arc<dyn Fs>,
 60    pub work_dir: PathBuf,
 61    /// The capabilities granted to extensions running on the host.
 62    pub(crate) granted_capabilities: Vec<ExtensionCapability>,
 63    /// Extension LLM providers allowed to read API keys from environment variables.
 64    pub(crate) allowed_env_var_providers: HashSet<Arc<str>>,
 65    _main_thread_message_task: Task<()>,
 66    main_thread_message_tx: mpsc::UnboundedSender<MainThreadCall>,
 67}
 68
 69#[derive(Clone, Debug)]
 70pub struct WasmExtension {
 71    tx: UnboundedSender<ExtensionCall>,
 72    pub manifest: Arc<ExtensionManifest>,
 73    pub work_dir: Arc<Path>,
 74    #[allow(unused)]
 75    pub zed_api_version: Version,
 76    _task: Arc<Task<Result<(), gpui_tokio::JoinError>>>,
 77}
 78
 79#[async_trait]
 80impl extension::Extension for WasmExtension {
 81    fn manifest(&self) -> Arc<ExtensionManifest> {
 82        self.manifest.clone()
 83    }
 84
 85    fn work_dir(&self) -> Arc<Path> {
 86        self.work_dir.clone()
 87    }
 88
 89    async fn language_server_command(
 90        &self,
 91        language_server_id: LanguageServerName,
 92        language_name: LanguageName,
 93        worktree: Arc<dyn WorktreeDelegate>,
 94    ) -> Result<Command> {
 95        self.call(|extension, store| {
 96            async move {
 97                let resource = store.data_mut().table().push(worktree)?;
 98                let command = extension
 99                    .call_language_server_command(
100                        store,
101                        &language_server_id,
102                        &language_name,
103                        resource,
104                    )
105                    .await?
106                    .map_err(|err| store.data().extension_error(err))?;
107
108                Ok(command.into())
109            }
110            .boxed()
111        })
112        .await?
113    }
114
115    async fn language_server_initialization_options(
116        &self,
117        language_server_id: LanguageServerName,
118        language_name: LanguageName,
119        worktree: Arc<dyn WorktreeDelegate>,
120    ) -> Result<Option<String>> {
121        self.call(|extension, store| {
122            async move {
123                let resource = store.data_mut().table().push(worktree)?;
124                let options = extension
125                    .call_language_server_initialization_options(
126                        store,
127                        &language_server_id,
128                        &language_name,
129                        resource,
130                    )
131                    .await?
132                    .map_err(|err| store.data().extension_error(err))?;
133                anyhow::Ok(options)
134            }
135            .boxed()
136        })
137        .await?
138    }
139
140    async fn language_server_workspace_configuration(
141        &self,
142        language_server_id: LanguageServerName,
143        worktree: Arc<dyn WorktreeDelegate>,
144    ) -> Result<Option<String>> {
145        self.call(|extension, store| {
146            async move {
147                let resource = store.data_mut().table().push(worktree)?;
148                let options = extension
149                    .call_language_server_workspace_configuration(
150                        store,
151                        &language_server_id,
152                        resource,
153                    )
154                    .await?
155                    .map_err(|err| store.data().extension_error(err))?;
156                anyhow::Ok(options)
157            }
158            .boxed()
159        })
160        .await?
161    }
162
163    async fn language_server_additional_initialization_options(
164        &self,
165        language_server_id: LanguageServerName,
166        target_language_server_id: LanguageServerName,
167        worktree: Arc<dyn WorktreeDelegate>,
168    ) -> Result<Option<String>> {
169        self.call(|extension, store| {
170            async move {
171                let resource = store.data_mut().table().push(worktree)?;
172                let options = extension
173                    .call_language_server_additional_initialization_options(
174                        store,
175                        &language_server_id,
176                        &target_language_server_id,
177                        resource,
178                    )
179                    .await?
180                    .map_err(|err| store.data().extension_error(err))?;
181                anyhow::Ok(options)
182            }
183            .boxed()
184        })
185        .await?
186    }
187
188    async fn language_server_additional_workspace_configuration(
189        &self,
190        language_server_id: LanguageServerName,
191        target_language_server_id: LanguageServerName,
192        worktree: Arc<dyn WorktreeDelegate>,
193    ) -> Result<Option<String>> {
194        self.call(|extension, store| {
195            async move {
196                let resource = store.data_mut().table().push(worktree)?;
197                let options = extension
198                    .call_language_server_additional_workspace_configuration(
199                        store,
200                        &language_server_id,
201                        &target_language_server_id,
202                        resource,
203                    )
204                    .await?
205                    .map_err(|err| store.data().extension_error(err))?;
206                anyhow::Ok(options)
207            }
208            .boxed()
209        })
210        .await?
211    }
212
213    async fn labels_for_completions(
214        &self,
215        language_server_id: LanguageServerName,
216        completions: Vec<Completion>,
217    ) -> Result<Vec<Option<CodeLabel>>> {
218        self.call(|extension, store| {
219            async move {
220                let labels = extension
221                    .call_labels_for_completions(
222                        store,
223                        &language_server_id,
224                        completions.into_iter().map(Into::into).collect(),
225                    )
226                    .await?
227                    .map_err(|err| store.data().extension_error(err))?;
228
229                Ok(labels
230                    .into_iter()
231                    .map(|label| label.map(Into::into))
232                    .collect())
233            }
234            .boxed()
235        })
236        .await?
237    }
238
239    async fn labels_for_symbols(
240        &self,
241        language_server_id: LanguageServerName,
242        symbols: Vec<Symbol>,
243    ) -> Result<Vec<Option<CodeLabel>>> {
244        self.call(|extension, store| {
245            async move {
246                let labels = extension
247                    .call_labels_for_symbols(
248                        store,
249                        &language_server_id,
250                        symbols.into_iter().map(Into::into).collect(),
251                    )
252                    .await?
253                    .map_err(|err| store.data().extension_error(err))?;
254
255                Ok(labels
256                    .into_iter()
257                    .map(|label| label.map(Into::into))
258                    .collect())
259            }
260            .boxed()
261        })
262        .await?
263    }
264
265    async fn complete_slash_command_argument(
266        &self,
267        command: SlashCommand,
268        arguments: Vec<String>,
269    ) -> Result<Vec<SlashCommandArgumentCompletion>> {
270        self.call(|extension, store| {
271            async move {
272                let completions = extension
273                    .call_complete_slash_command_argument(store, &command.into(), &arguments)
274                    .await?
275                    .map_err(|err| store.data().extension_error(err))?;
276
277                Ok(completions.into_iter().map(Into::into).collect())
278            }
279            .boxed()
280        })
281        .await?
282    }
283
284    async fn run_slash_command(
285        &self,
286        command: SlashCommand,
287        arguments: Vec<String>,
288        delegate: Option<Arc<dyn WorktreeDelegate>>,
289    ) -> Result<SlashCommandOutput> {
290        self.call(|extension, store| {
291            async move {
292                let resource = if let Some(delegate) = delegate {
293                    Some(store.data_mut().table().push(delegate)?)
294                } else {
295                    None
296                };
297
298                let output = extension
299                    .call_run_slash_command(store, &command.into(), &arguments, resource)
300                    .await?
301                    .map_err(|err| store.data().extension_error(err))?;
302
303                Ok(output.into())
304            }
305            .boxed()
306        })
307        .await?
308    }
309
310    async fn context_server_command(
311        &self,
312        context_server_id: Arc<str>,
313        project: Arc<dyn ProjectDelegate>,
314    ) -> Result<Command> {
315        self.call(|extension, store| {
316            async move {
317                let project_resource = store.data_mut().table().push(project)?;
318                let command = extension
319                    .call_context_server_command(store, context_server_id.clone(), project_resource)
320                    .await?
321                    .map_err(|err| store.data().extension_error(err))?;
322                anyhow::Ok(command.into())
323            }
324            .boxed()
325        })
326        .await?
327    }
328
329    async fn context_server_configuration(
330        &self,
331        context_server_id: Arc<str>,
332        project: Arc<dyn ProjectDelegate>,
333    ) -> Result<Option<ContextServerConfiguration>> {
334        self.call(|extension, store| {
335            async move {
336                let project_resource = store.data_mut().table().push(project)?;
337                let Some(configuration) = extension
338                    .call_context_server_configuration(
339                        store,
340                        context_server_id.clone(),
341                        project_resource,
342                    )
343                    .await?
344                    .map_err(|err| store.data().extension_error(err))?
345                else {
346                    return Ok(None);
347                };
348
349                Ok(Some(configuration.try_into()?))
350            }
351            .boxed()
352        })
353        .await?
354    }
355
356    async fn suggest_docs_packages(&self, provider: Arc<str>) -> Result<Vec<String>> {
357        self.call(|extension, store| {
358            async move {
359                let packages = extension
360                    .call_suggest_docs_packages(store, provider.as_ref())
361                    .await?
362                    .map_err(|err| store.data().extension_error(err))?;
363
364                Ok(packages)
365            }
366            .boxed()
367        })
368        .await?
369    }
370
371    async fn index_docs(
372        &self,
373        provider: Arc<str>,
374        package_name: Arc<str>,
375        kv_store: Arc<dyn KeyValueStoreDelegate>,
376    ) -> Result<()> {
377        self.call(|extension, store| {
378            async move {
379                let kv_store_resource = store.data_mut().table().push(kv_store)?;
380                extension
381                    .call_index_docs(
382                        store,
383                        provider.as_ref(),
384                        package_name.as_ref(),
385                        kv_store_resource,
386                    )
387                    .await?
388                    .map_err(|err| store.data().extension_error(err))?;
389
390                anyhow::Ok(())
391            }
392            .boxed()
393        })
394        .await?
395    }
396
397    async fn get_dap_binary(
398        &self,
399        dap_name: Arc<str>,
400        config: DebugTaskDefinition,
401        user_installed_path: Option<PathBuf>,
402        worktree: Arc<dyn WorktreeDelegate>,
403    ) -> Result<DebugAdapterBinary> {
404        self.call(|extension, store| {
405            async move {
406                let resource = store.data_mut().table().push(worktree)?;
407                let dap_binary = extension
408                    .call_get_dap_binary(store, dap_name, config, user_installed_path, resource)
409                    .await?
410                    .map_err(|err| store.data().extension_error(err))?;
411                let dap_binary = dap_binary.try_into()?;
412                Ok(dap_binary)
413            }
414            .boxed()
415        })
416        .await?
417    }
418    async fn dap_request_kind(
419        &self,
420        dap_name: Arc<str>,
421        config: serde_json::Value,
422    ) -> Result<StartDebuggingRequestArgumentsRequest> {
423        self.call(|extension, store| {
424            async move {
425                let kind = extension
426                    .call_dap_request_kind(store, dap_name, config)
427                    .await?
428                    .map_err(|err| store.data().extension_error(err))?;
429                Ok(kind.into())
430            }
431            .boxed()
432        })
433        .await?
434    }
435
436    async fn dap_config_to_scenario(&self, config: ZedDebugConfig) -> Result<DebugScenario> {
437        self.call(|extension, store| {
438            async move {
439                let kind = extension
440                    .call_dap_config_to_scenario(store, config)
441                    .await?
442                    .map_err(|err| store.data().extension_error(err))?;
443                Ok(kind)
444            }
445            .boxed()
446        })
447        .await?
448    }
449
450    async fn dap_locator_create_scenario(
451        &self,
452        locator_name: String,
453        build_config_template: TaskTemplate,
454        resolved_label: String,
455        debug_adapter_name: String,
456    ) -> Result<Option<DebugScenario>> {
457        self.call(|extension, store| {
458            async move {
459                extension
460                    .call_dap_locator_create_scenario(
461                        store,
462                        locator_name,
463                        build_config_template,
464                        resolved_label,
465                        debug_adapter_name,
466                    )
467                    .await
468            }
469            .boxed()
470        })
471        .await?
472    }
473    async fn run_dap_locator(
474        &self,
475        locator_name: String,
476        config: SpawnInTerminal,
477    ) -> Result<DebugRequest> {
478        self.call(|extension, store| {
479            async move {
480                extension
481                    .call_run_dap_locator(store, locator_name, config)
482                    .await?
483                    .map_err(|err| store.data().extension_error(err))
484            }
485            .boxed()
486        })
487        .await?
488    }
489}
490
491pub struct WasmState {
492    manifest: Arc<ExtensionManifest>,
493    pub table: ResourceTable,
494    ctx: wasi::WasiCtx,
495    pub host: Arc<WasmHost>,
496    pub(crate) capability_granter: CapabilityGranter,
497}
498
499std::thread_local! {
500    /// Used by the crash handler to ignore panics in extension-related threads.
501    pub static IS_WASM_THREAD: AtomicBool = const { AtomicBool::new(false) };
502}
503
504type MainThreadCall = Box<dyn Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, ()>>;
505
506type ExtensionCall = Box<
507    dyn Send + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, ()>,
508>;
509
510fn wasm_engine(executor: &BackgroundExecutor) -> wasmtime::Engine {
511    static WASM_ENGINE: OnceLock<wasmtime::Engine> = OnceLock::new();
512    WASM_ENGINE
513        .get_or_init(|| {
514            let mut config = wasmtime::Config::new();
515            config.wasm_component_model(true);
516            config.async_support(true);
517            config
518                .enable_incremental_compilation(cache_store())
519                .unwrap();
520            // Async support introduces the issue that extension execution happens during `Future::poll`,
521            // which could block an async thread.
522            // https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#execution-in-poll
523            //
524            // Epoch interruption is a lightweight mechanism to allow the extensions to yield control
525            // back to the executor at regular intervals.
526            config.epoch_interruption(true);
527
528            let engine = wasmtime::Engine::new(&config).unwrap();
529
530            // It might be safer to do this on a non-async thread to make sure it makes progress
531            // regardless of if extensions are blocking.
532            // However, due to our current setup, this isn't a likely occurrence and we'd rather
533            // not have a dedicated thread just for this. If it becomes an issue, we can consider
534            // creating a separate thread for epoch interruption.
535            let engine_ref = engine.weak();
536            executor
537                .spawn(async move {
538                    // Somewhat arbitrary interval, as it isn't a guaranteed interval.
539                    // But this is a rough upper bound for how long the extension execution can block on
540                    // `Future::poll`.
541                    const EPOCH_INTERVAL: Duration = Duration::from_millis(100);
542                    let mut timer = Timer::interval(EPOCH_INTERVAL);
543                    while (timer.next().await).is_some() {
544                        // Exit the loop and thread once the engine is dropped.
545                        let Some(engine) = engine_ref.upgrade() else {
546                            break;
547                        };
548                        engine.increment_epoch();
549                    }
550                })
551                .detach();
552
553            engine
554        })
555        .clone()
556}
557
558fn cache_store() -> Arc<IncrementalCompilationCache> {
559    static CACHE_STORE: LazyLock<Arc<IncrementalCompilationCache>> =
560        LazyLock::new(|| Arc::new(IncrementalCompilationCache::new()));
561    CACHE_STORE.clone()
562}
563
564impl WasmHost {
565    pub fn new(
566        fs: Arc<dyn Fs>,
567        http_client: Arc<dyn HttpClient>,
568        node_runtime: NodeRuntime,
569        proxy: Arc<ExtensionHostProxy>,
570        work_dir: PathBuf,
571        cx: &mut App,
572    ) -> Arc<Self> {
573        let (tx, mut rx) = mpsc::unbounded::<MainThreadCall>();
574        let task = cx.spawn(async move |cx| {
575            while let Some(message) = rx.next().await {
576                message(cx).await;
577            }
578        });
579
580        let extension_settings = ExtensionSettings::get_global(cx);
581
582        Arc::new(Self {
583            engine: wasm_engine(cx.background_executor()),
584            fs,
585            work_dir,
586            http_client,
587            node_runtime,
588            proxy,
589            release_channel: ReleaseChannel::global(cx),
590            granted_capabilities: extension_settings.granted_capabilities.clone(),
591            allowed_env_var_providers: extension_settings.allowed_env_var_providers.clone(),
592            _main_thread_message_task: task,
593            main_thread_message_tx: tx,
594        })
595    }
596
597    pub fn load_extension(
598        self: &Arc<Self>,
599        wasm_bytes: Vec<u8>,
600        manifest: &Arc<ExtensionManifest>,
601        cx: &AsyncApp,
602    ) -> Task<Result<WasmExtension>> {
603        let this = self.clone();
604        let manifest = manifest.clone();
605        let executor = cx.background_executor().clone();
606        let load_extension_task = async move {
607            let zed_api_version = parse_wasm_extension_version(&manifest.id, &wasm_bytes)?;
608
609            let component = Component::from_binary(&this.engine, &wasm_bytes)
610                .context("failed to compile wasm component")?;
611            let mut store = wasmtime::Store::new(
612                &this.engine,
613                WasmState {
614                    ctx: this.build_wasi_ctx(&manifest).await?,
615                    manifest: manifest.clone(),
616                    table: ResourceTable::new(),
617                    host: this.clone(),
618                    capability_granter: CapabilityGranter::new(
619                        this.granted_capabilities.clone(),
620                        manifest.clone(),
621                    ),
622                },
623            );
624            // Store will yield after 1 tick, and get a new deadline of 1 tick after each yield.
625            store.set_epoch_deadline(1);
626            store.epoch_deadline_async_yield_and_update(1);
627
628            let mut extension = Extension::instantiate_async(
629                &executor,
630                &mut store,
631                this.release_channel,
632                zed_api_version.clone(),
633                &component,
634            )
635            .await?;
636
637            extension
638                .call_init_extension(&mut store)
639                .await
640                .context("failed to initialize wasm extension")?;
641
642            let (tx, mut rx) = mpsc::unbounded::<ExtensionCall>();
643            let extension_task = async move {
644                // note: Setting the thread local here will slowly "poison" all tokio threads
645                // causing us to not record their panics any longer.
646                //
647                // This is fine though, the main zed binary only uses tokio for livekit and wasm extensions.
648                // Livekit seldom (if ever) panics 🤞 so the likelihood of us missing a panic in sentry is very low.
649                IS_WASM_THREAD.with(|v| v.store(true, Ordering::Release));
650                while let Some(call) = rx.next().await {
651                    (call)(&mut extension, &mut store).await;
652                }
653            };
654
655            anyhow::Ok((
656                extension_task,
657                manifest.clone(),
658                this.work_dir.join(manifest.id.as_ref()).into(),
659                tx,
660                zed_api_version,
661            ))
662        };
663        cx.spawn(async move |cx| {
664            let (extension_task, manifest, work_dir, tx, zed_api_version) =
665                cx.background_executor().spawn(load_extension_task).await?;
666            // we need to run run the task in a tokio context as wasmtime_wasi may
667            // call into tokio, accessing its runtime handle when we trigger the `engine.increment_epoch()` above.
668            let task = Arc::new(gpui_tokio::Tokio::spawn(cx, extension_task)?);
669
670            Ok(WasmExtension {
671                manifest,
672                work_dir,
673                tx,
674                zed_api_version,
675                _task: task,
676            })
677        })
678    }
679
680    async fn build_wasi_ctx(&self, manifest: &Arc<ExtensionManifest>) -> Result<wasi::WasiCtx> {
681        let extension_work_dir = self.work_dir.join(manifest.id.as_ref());
682        self.fs
683            .create_dir(&extension_work_dir)
684            .await
685            .context("failed to create extension work dir")?;
686
687        let file_perms = wasi::FilePerms::all();
688        let dir_perms = wasi::DirPerms::all();
689        let path = SanitizedPath::new(&extension_work_dir).to_string();
690        #[cfg(target_os = "windows")]
691        let path = path.replace('\\', "/");
692
693        let mut ctx = wasi::WasiCtxBuilder::new();
694        ctx.inherit_stdio()
695            .env("PWD", &path)
696            .env("RUST_BACKTRACE", "full");
697
698        ctx.preopened_dir(&path, ".", dir_perms, file_perms)?;
699        ctx.preopened_dir(&path, &path, dir_perms, file_perms)?;
700
701        Ok(ctx.build())
702    }
703
704    pub fn writeable_path_from_extension(&self, id: &Arc<str>, path: &Path) -> Result<PathBuf> {
705        let extension_work_dir = self.work_dir.join(id.as_ref());
706        let path = normalize_path(&extension_work_dir.join(path));
707        anyhow::ensure!(
708            path.starts_with(&extension_work_dir),
709            "cannot write to path {path:?}",
710        );
711        Ok(path)
712    }
713}
714
715pub fn parse_wasm_extension_version(extension_id: &str, wasm_bytes: &[u8]) -> Result<Version> {
716    let mut version = None;
717
718    for part in wasmparser::Parser::new(0).parse_all(wasm_bytes) {
719        if let wasmparser::Payload::CustomSection(s) =
720            part.context("error parsing wasm extension")?
721            && s.name() == "zed:api-version"
722        {
723            version = parse_wasm_extension_version_custom_section(s.data());
724            if version.is_none() {
725                bail!(
726                    "extension {} has invalid zed:api-version section: {:?}",
727                    extension_id,
728                    s.data()
729                );
730            }
731        }
732    }
733
734    // The reason we wait until we're done parsing all of the Wasm bytes to return the version
735    // is to work around a panic that can happen inside of Wasmtime when the bytes are invalid.
736    //
737    // By parsing the entirety of the Wasm bytes before we return, we're able to detect this problem
738    // earlier as an `Err` rather than as a panic.
739    version.with_context(|| format!("extension {extension_id} has no zed:api-version section"))
740}
741
742fn parse_wasm_extension_version_custom_section(data: &[u8]) -> Option<Version> {
743    if data.len() == 6 {
744        Some(Version::new(
745            u16::from_be_bytes([data[0], data[1]]) as _,
746            u16::from_be_bytes([data[2], data[3]]) as _,
747            u16::from_be_bytes([data[4], data[5]]) as _,
748        ))
749    } else {
750        None
751    }
752}
753
754impl WasmExtension {
755    pub async fn load(
756        extension_dir: &Path,
757        manifest: &Arc<ExtensionManifest>,
758        wasm_host: Arc<WasmHost>,
759        cx: &AsyncApp,
760    ) -> Result<Self> {
761        let path = extension_dir.join("extension.wasm");
762
763        let mut wasm_file = wasm_host
764            .fs
765            .open_sync(&path)
766            .await
767            .context(format!("opening wasm file, path: {path:?}"))?;
768
769        let mut wasm_bytes = Vec::new();
770        wasm_file
771            .read_to_end(&mut wasm_bytes)
772            .context(format!("reading wasm file, path: {path:?}"))?;
773
774        wasm_host
775            .load_extension(wasm_bytes, manifest, cx)
776            .await
777            .with_context(|| format!("loading wasm extension: {}", manifest.id))
778    }
779
780    pub async fn call<T, Fn>(&self, f: Fn) -> Result<T>
781    where
782        T: 'static + Send,
783        Fn: 'static
784            + Send
785            + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, T>,
786    {
787        let (return_tx, return_rx) = oneshot::channel();
788        self.tx
789            .unbounded_send(Box::new(move |extension, store| {
790                async {
791                    let result = f(extension, store).await;
792                    return_tx.send(result).ok();
793                }
794                .boxed()
795            }))
796            .map_err(|_| {
797                anyhow!(
798                    "wasm extension channel should not be closed yet, extension {} (id {})",
799                    self.manifest.name,
800                    self.manifest.id,
801                )
802            })?;
803        return_rx.await.with_context(|| {
804            format!(
805                "wasm extension channel, extension {} (id {})",
806                self.manifest.name, self.manifest.id,
807            )
808        })
809    }
810}
811
812impl WasmState {
813    fn on_main_thread<T, Fn>(&self, f: Fn) -> impl 'static + Future<Output = T>
814    where
815        T: 'static + Send,
816        Fn: 'static + Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, T>,
817    {
818        let (return_tx, return_rx) = oneshot::channel();
819        self.host
820            .main_thread_message_tx
821            .clone()
822            .unbounded_send(Box::new(move |cx| {
823                async {
824                    let result = f(cx).await;
825                    return_tx.send(result).ok();
826                }
827                .boxed_local()
828            }))
829            .unwrap_or_else(|_| {
830                panic!(
831                    "main thread message channel should not be closed yet, extension {} (id {})",
832                    self.manifest.name, self.manifest.id,
833                )
834            });
835        let name = self.manifest.name.clone();
836        let id = self.manifest.id.clone();
837        async move {
838            return_rx.await.unwrap_or_else(|_| {
839                panic!("main thread message channel, extension {name} (id {id})")
840            })
841        }
842    }
843
844    fn work_dir(&self) -> PathBuf {
845        self.host.work_dir.join(self.manifest.id.as_ref())
846    }
847
848    fn extension_error(&self, message: String) -> anyhow::Error {
849        anyhow!(
850            "from extension \"{}\" version {}: {}",
851            self.manifest.name,
852            self.manifest.version,
853            message
854        )
855    }
856}
857
858impl wasi::WasiView for WasmState {
859    fn table(&mut self) -> &mut ResourceTable {
860        &mut self.table
861    }
862
863    fn ctx(&mut self) -> &mut wasi::WasiCtx {
864        &mut self.ctx
865    }
866}
867
868/// Wrapper around a mini-moka bounded cache for storing incremental compilation artifacts.
869/// Since wasm modules have many similar elements, this can save us a lot of work at the
870/// cost of a small memory footprint. However, we don't want this to be unbounded, so we use
871/// a LFU/LRU cache to evict less used cache entries.
872#[derive(Debug)]
873struct IncrementalCompilationCache {
874    cache: Cache<Vec<u8>, Vec<u8>>,
875}
876
877impl IncrementalCompilationCache {
878    fn new() -> Self {
879        let cache = Cache::builder()
880            // Cap this at 32 MB for now. Our extensions turn into roughly 512kb in the cache,
881            // which means we could store 64 completely novel extensions in the cache, but in
882            // practice we will more than that, which is more than enough for our use case.
883            .max_capacity(32 * 1024 * 1024)
884            .weigher(|k: &Vec<u8>, v: &Vec<u8>| (k.len() + v.len()).try_into().unwrap_or(u32::MAX))
885            .build();
886        Self { cache }
887    }
888}
889
890impl CacheStore for IncrementalCompilationCache {
891    fn get(&self, key: &[u8]) -> Option<Cow<'_, [u8]>> {
892        self.cache.get(key).map(|v| v.into())
893    }
894
895    fn insert(&self, key: &[u8], value: Vec<u8>) -> bool {
896        self.cache.insert(key.to_vec(), value);
897        true
898    }
899}