1pub mod llm_provider;
2pub mod wit;
3
4use crate::capability_granter::CapabilityGranter;
5use crate::{ExtensionManifest, ExtensionSettings};
6use anyhow::{Context as _, Result, anyhow, bail};
7use async_trait::async_trait;
8use collections::HashSet;
9use dap::{DebugRequest, StartDebuggingRequestArgumentsRequest};
10use extension::{
11 CodeLabel, Command, Completion, ContextServerConfiguration, DebugAdapterBinary,
12 DebugTaskDefinition, ExtensionCapability, ExtensionHostProxy, KeyValueStoreDelegate,
13 ProjectDelegate, SlashCommand, SlashCommandArgumentCompletion, SlashCommandOutput, Symbol,
14 WorktreeDelegate,
15};
16use fs::{Fs, normalize_path};
17use futures::future::LocalBoxFuture;
18use futures::{
19 Future, FutureExt, StreamExt as _,
20 channel::{
21 mpsc::{self, UnboundedSender},
22 oneshot,
23 },
24 future::BoxFuture,
25};
26use gpui::{App, AsyncApp, BackgroundExecutor, Task, Timer};
27use http_client::HttpClient;
28use language::LanguageName;
29use lsp::LanguageServerName;
30use moka::sync::Cache;
31use node_runtime::NodeRuntime;
32use release_channel::ReleaseChannel;
33use semver::Version;
34use settings::Settings;
35use std::{
36 borrow::Cow,
37 path::{Path, PathBuf},
38 sync::{
39 Arc, LazyLock, OnceLock,
40 atomic::{AtomicBool, Ordering},
41 },
42 time::Duration,
43};
44use task::{DebugScenario, SpawnInTerminal, TaskTemplate, ZedDebugConfig};
45use util::paths::SanitizedPath;
46use wasmtime::{
47 CacheStore, Engine, Store,
48 component::{Component, ResourceTable},
49};
50use wasmtime_wasi::{self as wasi, WasiView};
51use wit::Extension;
52
53pub struct WasmHost {
54 engine: Engine,
55 release_channel: ReleaseChannel,
56 http_client: Arc<dyn HttpClient>,
57 node_runtime: NodeRuntime,
58 pub(crate) proxy: Arc<ExtensionHostProxy>,
59 fs: Arc<dyn Fs>,
60 pub work_dir: PathBuf,
61 /// The capabilities granted to extensions running on the host.
62 pub(crate) granted_capabilities: Vec<ExtensionCapability>,
63 /// Extension LLM providers allowed to read API keys from environment variables.
64 pub(crate) allowed_env_var_providers: HashSet<Arc<str>>,
65 _main_thread_message_task: Task<()>,
66 main_thread_message_tx: mpsc::UnboundedSender<MainThreadCall>,
67}
68
69#[derive(Clone, Debug)]
70pub struct WasmExtension {
71 tx: UnboundedSender<ExtensionCall>,
72 pub manifest: Arc<ExtensionManifest>,
73 pub work_dir: Arc<Path>,
74 #[allow(unused)]
75 pub zed_api_version: Version,
76 _task: Arc<Task<Result<(), gpui_tokio::JoinError>>>,
77}
78
79#[async_trait]
80impl extension::Extension for WasmExtension {
81 fn manifest(&self) -> Arc<ExtensionManifest> {
82 self.manifest.clone()
83 }
84
85 fn work_dir(&self) -> Arc<Path> {
86 self.work_dir.clone()
87 }
88
89 async fn language_server_command(
90 &self,
91 language_server_id: LanguageServerName,
92 language_name: LanguageName,
93 worktree: Arc<dyn WorktreeDelegate>,
94 ) -> Result<Command> {
95 self.call(|extension, store| {
96 async move {
97 let resource = store.data_mut().table().push(worktree)?;
98 let command = extension
99 .call_language_server_command(
100 store,
101 &language_server_id,
102 &language_name,
103 resource,
104 )
105 .await?
106 .map_err(|err| store.data().extension_error(err))?;
107
108 Ok(command.into())
109 }
110 .boxed()
111 })
112 .await?
113 }
114
115 async fn language_server_initialization_options(
116 &self,
117 language_server_id: LanguageServerName,
118 language_name: LanguageName,
119 worktree: Arc<dyn WorktreeDelegate>,
120 ) -> Result<Option<String>> {
121 self.call(|extension, store| {
122 async move {
123 let resource = store.data_mut().table().push(worktree)?;
124 let options = extension
125 .call_language_server_initialization_options(
126 store,
127 &language_server_id,
128 &language_name,
129 resource,
130 )
131 .await?
132 .map_err(|err| store.data().extension_error(err))?;
133 anyhow::Ok(options)
134 }
135 .boxed()
136 })
137 .await?
138 }
139
140 async fn language_server_workspace_configuration(
141 &self,
142 language_server_id: LanguageServerName,
143 worktree: Arc<dyn WorktreeDelegate>,
144 ) -> Result<Option<String>> {
145 self.call(|extension, store| {
146 async move {
147 let resource = store.data_mut().table().push(worktree)?;
148 let options = extension
149 .call_language_server_workspace_configuration(
150 store,
151 &language_server_id,
152 resource,
153 )
154 .await?
155 .map_err(|err| store.data().extension_error(err))?;
156 anyhow::Ok(options)
157 }
158 .boxed()
159 })
160 .await?
161 }
162
163 async fn language_server_additional_initialization_options(
164 &self,
165 language_server_id: LanguageServerName,
166 target_language_server_id: LanguageServerName,
167 worktree: Arc<dyn WorktreeDelegate>,
168 ) -> Result<Option<String>> {
169 self.call(|extension, store| {
170 async move {
171 let resource = store.data_mut().table().push(worktree)?;
172 let options = extension
173 .call_language_server_additional_initialization_options(
174 store,
175 &language_server_id,
176 &target_language_server_id,
177 resource,
178 )
179 .await?
180 .map_err(|err| store.data().extension_error(err))?;
181 anyhow::Ok(options)
182 }
183 .boxed()
184 })
185 .await?
186 }
187
188 async fn language_server_additional_workspace_configuration(
189 &self,
190 language_server_id: LanguageServerName,
191 target_language_server_id: LanguageServerName,
192 worktree: Arc<dyn WorktreeDelegate>,
193 ) -> Result<Option<String>> {
194 self.call(|extension, store| {
195 async move {
196 let resource = store.data_mut().table().push(worktree)?;
197 let options = extension
198 .call_language_server_additional_workspace_configuration(
199 store,
200 &language_server_id,
201 &target_language_server_id,
202 resource,
203 )
204 .await?
205 .map_err(|err| store.data().extension_error(err))?;
206 anyhow::Ok(options)
207 }
208 .boxed()
209 })
210 .await?
211 }
212
213 async fn labels_for_completions(
214 &self,
215 language_server_id: LanguageServerName,
216 completions: Vec<Completion>,
217 ) -> Result<Vec<Option<CodeLabel>>> {
218 self.call(|extension, store| {
219 async move {
220 let labels = extension
221 .call_labels_for_completions(
222 store,
223 &language_server_id,
224 completions.into_iter().map(Into::into).collect(),
225 )
226 .await?
227 .map_err(|err| store.data().extension_error(err))?;
228
229 Ok(labels
230 .into_iter()
231 .map(|label| label.map(Into::into))
232 .collect())
233 }
234 .boxed()
235 })
236 .await?
237 }
238
239 async fn labels_for_symbols(
240 &self,
241 language_server_id: LanguageServerName,
242 symbols: Vec<Symbol>,
243 ) -> Result<Vec<Option<CodeLabel>>> {
244 self.call(|extension, store| {
245 async move {
246 let labels = extension
247 .call_labels_for_symbols(
248 store,
249 &language_server_id,
250 symbols.into_iter().map(Into::into).collect(),
251 )
252 .await?
253 .map_err(|err| store.data().extension_error(err))?;
254
255 Ok(labels
256 .into_iter()
257 .map(|label| label.map(Into::into))
258 .collect())
259 }
260 .boxed()
261 })
262 .await?
263 }
264
265 async fn complete_slash_command_argument(
266 &self,
267 command: SlashCommand,
268 arguments: Vec<String>,
269 ) -> Result<Vec<SlashCommandArgumentCompletion>> {
270 self.call(|extension, store| {
271 async move {
272 let completions = extension
273 .call_complete_slash_command_argument(store, &command.into(), &arguments)
274 .await?
275 .map_err(|err| store.data().extension_error(err))?;
276
277 Ok(completions.into_iter().map(Into::into).collect())
278 }
279 .boxed()
280 })
281 .await?
282 }
283
284 async fn run_slash_command(
285 &self,
286 command: SlashCommand,
287 arguments: Vec<String>,
288 delegate: Option<Arc<dyn WorktreeDelegate>>,
289 ) -> Result<SlashCommandOutput> {
290 self.call(|extension, store| {
291 async move {
292 let resource = if let Some(delegate) = delegate {
293 Some(store.data_mut().table().push(delegate)?)
294 } else {
295 None
296 };
297
298 let output = extension
299 .call_run_slash_command(store, &command.into(), &arguments, resource)
300 .await?
301 .map_err(|err| store.data().extension_error(err))?;
302
303 Ok(output.into())
304 }
305 .boxed()
306 })
307 .await?
308 }
309
310 async fn context_server_command(
311 &self,
312 context_server_id: Arc<str>,
313 project: Arc<dyn ProjectDelegate>,
314 ) -> Result<Command> {
315 self.call(|extension, store| {
316 async move {
317 let project_resource = store.data_mut().table().push(project)?;
318 let command = extension
319 .call_context_server_command(store, context_server_id.clone(), project_resource)
320 .await?
321 .map_err(|err| store.data().extension_error(err))?;
322 anyhow::Ok(command.into())
323 }
324 .boxed()
325 })
326 .await?
327 }
328
329 async fn context_server_configuration(
330 &self,
331 context_server_id: Arc<str>,
332 project: Arc<dyn ProjectDelegate>,
333 ) -> Result<Option<ContextServerConfiguration>> {
334 self.call(|extension, store| {
335 async move {
336 let project_resource = store.data_mut().table().push(project)?;
337 let Some(configuration) = extension
338 .call_context_server_configuration(
339 store,
340 context_server_id.clone(),
341 project_resource,
342 )
343 .await?
344 .map_err(|err| store.data().extension_error(err))?
345 else {
346 return Ok(None);
347 };
348
349 Ok(Some(configuration.try_into()?))
350 }
351 .boxed()
352 })
353 .await?
354 }
355
356 async fn suggest_docs_packages(&self, provider: Arc<str>) -> Result<Vec<String>> {
357 self.call(|extension, store| {
358 async move {
359 let packages = extension
360 .call_suggest_docs_packages(store, provider.as_ref())
361 .await?
362 .map_err(|err| store.data().extension_error(err))?;
363
364 Ok(packages)
365 }
366 .boxed()
367 })
368 .await?
369 }
370
371 async fn index_docs(
372 &self,
373 provider: Arc<str>,
374 package_name: Arc<str>,
375 kv_store: Arc<dyn KeyValueStoreDelegate>,
376 ) -> Result<()> {
377 self.call(|extension, store| {
378 async move {
379 let kv_store_resource = store.data_mut().table().push(kv_store)?;
380 extension
381 .call_index_docs(
382 store,
383 provider.as_ref(),
384 package_name.as_ref(),
385 kv_store_resource,
386 )
387 .await?
388 .map_err(|err| store.data().extension_error(err))?;
389
390 anyhow::Ok(())
391 }
392 .boxed()
393 })
394 .await?
395 }
396
397 async fn get_dap_binary(
398 &self,
399 dap_name: Arc<str>,
400 config: DebugTaskDefinition,
401 user_installed_path: Option<PathBuf>,
402 worktree: Arc<dyn WorktreeDelegate>,
403 ) -> Result<DebugAdapterBinary> {
404 self.call(|extension, store| {
405 async move {
406 let resource = store.data_mut().table().push(worktree)?;
407 let dap_binary = extension
408 .call_get_dap_binary(store, dap_name, config, user_installed_path, resource)
409 .await?
410 .map_err(|err| store.data().extension_error(err))?;
411 let dap_binary = dap_binary.try_into()?;
412 Ok(dap_binary)
413 }
414 .boxed()
415 })
416 .await?
417 }
418 async fn dap_request_kind(
419 &self,
420 dap_name: Arc<str>,
421 config: serde_json::Value,
422 ) -> Result<StartDebuggingRequestArgumentsRequest> {
423 self.call(|extension, store| {
424 async move {
425 let kind = extension
426 .call_dap_request_kind(store, dap_name, config)
427 .await?
428 .map_err(|err| store.data().extension_error(err))?;
429 Ok(kind.into())
430 }
431 .boxed()
432 })
433 .await?
434 }
435
436 async fn dap_config_to_scenario(&self, config: ZedDebugConfig) -> Result<DebugScenario> {
437 self.call(|extension, store| {
438 async move {
439 let kind = extension
440 .call_dap_config_to_scenario(store, config)
441 .await?
442 .map_err(|err| store.data().extension_error(err))?;
443 Ok(kind)
444 }
445 .boxed()
446 })
447 .await?
448 }
449
450 async fn dap_locator_create_scenario(
451 &self,
452 locator_name: String,
453 build_config_template: TaskTemplate,
454 resolved_label: String,
455 debug_adapter_name: String,
456 ) -> Result<Option<DebugScenario>> {
457 self.call(|extension, store| {
458 async move {
459 extension
460 .call_dap_locator_create_scenario(
461 store,
462 locator_name,
463 build_config_template,
464 resolved_label,
465 debug_adapter_name,
466 )
467 .await
468 }
469 .boxed()
470 })
471 .await?
472 }
473 async fn run_dap_locator(
474 &self,
475 locator_name: String,
476 config: SpawnInTerminal,
477 ) -> Result<DebugRequest> {
478 self.call(|extension, store| {
479 async move {
480 extension
481 .call_run_dap_locator(store, locator_name, config)
482 .await?
483 .map_err(|err| store.data().extension_error(err))
484 }
485 .boxed()
486 })
487 .await?
488 }
489}
490
491pub struct WasmState {
492 manifest: Arc<ExtensionManifest>,
493 pub table: ResourceTable,
494 ctx: wasi::WasiCtx,
495 pub host: Arc<WasmHost>,
496 pub(crate) capability_granter: CapabilityGranter,
497}
498
499std::thread_local! {
500 /// Used by the crash handler to ignore panics in extension-related threads.
501 pub static IS_WASM_THREAD: AtomicBool = const { AtomicBool::new(false) };
502}
503
504type MainThreadCall = Box<dyn Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, ()>>;
505
506type ExtensionCall = Box<
507 dyn Send + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, ()>,
508>;
509
510fn wasm_engine(executor: &BackgroundExecutor) -> wasmtime::Engine {
511 static WASM_ENGINE: OnceLock<wasmtime::Engine> = OnceLock::new();
512 WASM_ENGINE
513 .get_or_init(|| {
514 let mut config = wasmtime::Config::new();
515 config.wasm_component_model(true);
516 config.async_support(true);
517 config
518 .enable_incremental_compilation(cache_store())
519 .unwrap();
520 // Async support introduces the issue that extension execution happens during `Future::poll`,
521 // which could block an async thread.
522 // https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#execution-in-poll
523 //
524 // Epoch interruption is a lightweight mechanism to allow the extensions to yield control
525 // back to the executor at regular intervals.
526 config.epoch_interruption(true);
527
528 let engine = wasmtime::Engine::new(&config).unwrap();
529
530 // It might be safer to do this on a non-async thread to make sure it makes progress
531 // regardless of if extensions are blocking.
532 // However, due to our current setup, this isn't a likely occurrence and we'd rather
533 // not have a dedicated thread just for this. If it becomes an issue, we can consider
534 // creating a separate thread for epoch interruption.
535 let engine_ref = engine.weak();
536 executor
537 .spawn(async move {
538 // Somewhat arbitrary interval, as it isn't a guaranteed interval.
539 // But this is a rough upper bound for how long the extension execution can block on
540 // `Future::poll`.
541 const EPOCH_INTERVAL: Duration = Duration::from_millis(100);
542 let mut timer = Timer::interval(EPOCH_INTERVAL);
543 while (timer.next().await).is_some() {
544 // Exit the loop and thread once the engine is dropped.
545 let Some(engine) = engine_ref.upgrade() else {
546 break;
547 };
548 engine.increment_epoch();
549 }
550 })
551 .detach();
552
553 engine
554 })
555 .clone()
556}
557
558fn cache_store() -> Arc<IncrementalCompilationCache> {
559 static CACHE_STORE: LazyLock<Arc<IncrementalCompilationCache>> =
560 LazyLock::new(|| Arc::new(IncrementalCompilationCache::new()));
561 CACHE_STORE.clone()
562}
563
564impl WasmHost {
565 pub fn new(
566 fs: Arc<dyn Fs>,
567 http_client: Arc<dyn HttpClient>,
568 node_runtime: NodeRuntime,
569 proxy: Arc<ExtensionHostProxy>,
570 work_dir: PathBuf,
571 cx: &mut App,
572 ) -> Arc<Self> {
573 let (tx, mut rx) = mpsc::unbounded::<MainThreadCall>();
574 let task = cx.spawn(async move |cx| {
575 while let Some(message) = rx.next().await {
576 message(cx).await;
577 }
578 });
579
580 let extension_settings = ExtensionSettings::get_global(cx);
581
582 Arc::new(Self {
583 engine: wasm_engine(cx.background_executor()),
584 fs,
585 work_dir,
586 http_client,
587 node_runtime,
588 proxy,
589 release_channel: ReleaseChannel::global(cx),
590 granted_capabilities: extension_settings.granted_capabilities.clone(),
591 allowed_env_var_providers: extension_settings.allowed_env_var_providers.clone(),
592 _main_thread_message_task: task,
593 main_thread_message_tx: tx,
594 })
595 }
596
597 pub fn load_extension(
598 self: &Arc<Self>,
599 wasm_bytes: Vec<u8>,
600 manifest: &Arc<ExtensionManifest>,
601 cx: &AsyncApp,
602 ) -> Task<Result<WasmExtension>> {
603 let this = self.clone();
604 let manifest = manifest.clone();
605 let executor = cx.background_executor().clone();
606 let load_extension_task = async move {
607 let zed_api_version = parse_wasm_extension_version(&manifest.id, &wasm_bytes)?;
608
609 let component = Component::from_binary(&this.engine, &wasm_bytes)
610 .context("failed to compile wasm component")?;
611 let mut store = wasmtime::Store::new(
612 &this.engine,
613 WasmState {
614 ctx: this.build_wasi_ctx(&manifest).await?,
615 manifest: manifest.clone(),
616 table: ResourceTable::new(),
617 host: this.clone(),
618 capability_granter: CapabilityGranter::new(
619 this.granted_capabilities.clone(),
620 manifest.clone(),
621 ),
622 },
623 );
624 // Store will yield after 1 tick, and get a new deadline of 1 tick after each yield.
625 store.set_epoch_deadline(1);
626 store.epoch_deadline_async_yield_and_update(1);
627
628 let mut extension = Extension::instantiate_async(
629 &executor,
630 &mut store,
631 this.release_channel,
632 zed_api_version.clone(),
633 &component,
634 )
635 .await?;
636
637 extension
638 .call_init_extension(&mut store)
639 .await
640 .context("failed to initialize wasm extension")?;
641
642 let (tx, mut rx) = mpsc::unbounded::<ExtensionCall>();
643 let extension_task = async move {
644 // note: Setting the thread local here will slowly "poison" all tokio threads
645 // causing us to not record their panics any longer.
646 //
647 // This is fine though, the main zed binary only uses tokio for livekit and wasm extensions.
648 // Livekit seldom (if ever) panics 🤞 so the likelihood of us missing a panic in sentry is very low.
649 IS_WASM_THREAD.with(|v| v.store(true, Ordering::Release));
650 while let Some(call) = rx.next().await {
651 (call)(&mut extension, &mut store).await;
652 }
653 };
654
655 anyhow::Ok((
656 extension_task,
657 manifest.clone(),
658 this.work_dir.join(manifest.id.as_ref()).into(),
659 tx,
660 zed_api_version,
661 ))
662 };
663 cx.spawn(async move |cx| {
664 let (extension_task, manifest, work_dir, tx, zed_api_version) =
665 cx.background_executor().spawn(load_extension_task).await?;
666 // we need to run run the task in a tokio context as wasmtime_wasi may
667 // call into tokio, accessing its runtime handle when we trigger the `engine.increment_epoch()` above.
668 let task = Arc::new(gpui_tokio::Tokio::spawn(cx, extension_task)?);
669
670 Ok(WasmExtension {
671 manifest,
672 work_dir,
673 tx,
674 zed_api_version,
675 _task: task,
676 })
677 })
678 }
679
680 async fn build_wasi_ctx(&self, manifest: &Arc<ExtensionManifest>) -> Result<wasi::WasiCtx> {
681 let extension_work_dir = self.work_dir.join(manifest.id.as_ref());
682 self.fs
683 .create_dir(&extension_work_dir)
684 .await
685 .context("failed to create extension work dir")?;
686
687 let file_perms = wasi::FilePerms::all();
688 let dir_perms = wasi::DirPerms::all();
689 let path = SanitizedPath::new(&extension_work_dir).to_string();
690 #[cfg(target_os = "windows")]
691 let path = path.replace('\\', "/");
692
693 let mut ctx = wasi::WasiCtxBuilder::new();
694 ctx.inherit_stdio()
695 .env("PWD", &path)
696 .env("RUST_BACKTRACE", "full");
697
698 ctx.preopened_dir(&path, ".", dir_perms, file_perms)?;
699 ctx.preopened_dir(&path, &path, dir_perms, file_perms)?;
700
701 Ok(ctx.build())
702 }
703
704 pub fn writeable_path_from_extension(&self, id: &Arc<str>, path: &Path) -> Result<PathBuf> {
705 let extension_work_dir = self.work_dir.join(id.as_ref());
706 let path = normalize_path(&extension_work_dir.join(path));
707 anyhow::ensure!(
708 path.starts_with(&extension_work_dir),
709 "cannot write to path {path:?}",
710 );
711 Ok(path)
712 }
713}
714
715pub fn parse_wasm_extension_version(extension_id: &str, wasm_bytes: &[u8]) -> Result<Version> {
716 let mut version = None;
717
718 for part in wasmparser::Parser::new(0).parse_all(wasm_bytes) {
719 if let wasmparser::Payload::CustomSection(s) =
720 part.context("error parsing wasm extension")?
721 && s.name() == "zed:api-version"
722 {
723 version = parse_wasm_extension_version_custom_section(s.data());
724 if version.is_none() {
725 bail!(
726 "extension {} has invalid zed:api-version section: {:?}",
727 extension_id,
728 s.data()
729 );
730 }
731 }
732 }
733
734 // The reason we wait until we're done parsing all of the Wasm bytes to return the version
735 // is to work around a panic that can happen inside of Wasmtime when the bytes are invalid.
736 //
737 // By parsing the entirety of the Wasm bytes before we return, we're able to detect this problem
738 // earlier as an `Err` rather than as a panic.
739 version.with_context(|| format!("extension {extension_id} has no zed:api-version section"))
740}
741
742fn parse_wasm_extension_version_custom_section(data: &[u8]) -> Option<Version> {
743 if data.len() == 6 {
744 Some(Version::new(
745 u16::from_be_bytes([data[0], data[1]]) as _,
746 u16::from_be_bytes([data[2], data[3]]) as _,
747 u16::from_be_bytes([data[4], data[5]]) as _,
748 ))
749 } else {
750 None
751 }
752}
753
754impl WasmExtension {
755 pub async fn load(
756 extension_dir: &Path,
757 manifest: &Arc<ExtensionManifest>,
758 wasm_host: Arc<WasmHost>,
759 cx: &AsyncApp,
760 ) -> Result<Self> {
761 let path = extension_dir.join("extension.wasm");
762
763 let mut wasm_file = wasm_host
764 .fs
765 .open_sync(&path)
766 .await
767 .context(format!("opening wasm file, path: {path:?}"))?;
768
769 let mut wasm_bytes = Vec::new();
770 wasm_file
771 .read_to_end(&mut wasm_bytes)
772 .context(format!("reading wasm file, path: {path:?}"))?;
773
774 wasm_host
775 .load_extension(wasm_bytes, manifest, cx)
776 .await
777 .with_context(|| format!("loading wasm extension: {}", manifest.id))
778 }
779
780 pub async fn call<T, Fn>(&self, f: Fn) -> Result<T>
781 where
782 T: 'static + Send,
783 Fn: 'static
784 + Send
785 + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, T>,
786 {
787 let (return_tx, return_rx) = oneshot::channel();
788 self.tx
789 .unbounded_send(Box::new(move |extension, store| {
790 async {
791 let result = f(extension, store).await;
792 return_tx.send(result).ok();
793 }
794 .boxed()
795 }))
796 .map_err(|_| {
797 anyhow!(
798 "wasm extension channel should not be closed yet, extension {} (id {})",
799 self.manifest.name,
800 self.manifest.id,
801 )
802 })?;
803 return_rx.await.with_context(|| {
804 format!(
805 "wasm extension channel, extension {} (id {})",
806 self.manifest.name, self.manifest.id,
807 )
808 })
809 }
810}
811
812impl WasmState {
813 fn on_main_thread<T, Fn>(&self, f: Fn) -> impl 'static + Future<Output = T>
814 where
815 T: 'static + Send,
816 Fn: 'static + Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, T>,
817 {
818 let (return_tx, return_rx) = oneshot::channel();
819 self.host
820 .main_thread_message_tx
821 .clone()
822 .unbounded_send(Box::new(move |cx| {
823 async {
824 let result = f(cx).await;
825 return_tx.send(result).ok();
826 }
827 .boxed_local()
828 }))
829 .unwrap_or_else(|_| {
830 panic!(
831 "main thread message channel should not be closed yet, extension {} (id {})",
832 self.manifest.name, self.manifest.id,
833 )
834 });
835 let name = self.manifest.name.clone();
836 let id = self.manifest.id.clone();
837 async move {
838 return_rx.await.unwrap_or_else(|_| {
839 panic!("main thread message channel, extension {name} (id {id})")
840 })
841 }
842 }
843
844 fn work_dir(&self) -> PathBuf {
845 self.host.work_dir.join(self.manifest.id.as_ref())
846 }
847
848 fn extension_error(&self, message: String) -> anyhow::Error {
849 anyhow!(
850 "from extension \"{}\" version {}: {}",
851 self.manifest.name,
852 self.manifest.version,
853 message
854 )
855 }
856}
857
858impl wasi::WasiView for WasmState {
859 fn table(&mut self) -> &mut ResourceTable {
860 &mut self.table
861 }
862
863 fn ctx(&mut self) -> &mut wasi::WasiCtx {
864 &mut self.ctx
865 }
866}
867
868/// Wrapper around a mini-moka bounded cache for storing incremental compilation artifacts.
869/// Since wasm modules have many similar elements, this can save us a lot of work at the
870/// cost of a small memory footprint. However, we don't want this to be unbounded, so we use
871/// a LFU/LRU cache to evict less used cache entries.
872#[derive(Debug)]
873struct IncrementalCompilationCache {
874 cache: Cache<Vec<u8>, Vec<u8>>,
875}
876
877impl IncrementalCompilationCache {
878 fn new() -> Self {
879 let cache = Cache::builder()
880 // Cap this at 32 MB for now. Our extensions turn into roughly 512kb in the cache,
881 // which means we could store 64 completely novel extensions in the cache, but in
882 // practice we will more than that, which is more than enough for our use case.
883 .max_capacity(32 * 1024 * 1024)
884 .weigher(|k: &Vec<u8>, v: &Vec<u8>| (k.len() + v.len()).try_into().unwrap_or(u32::MAX))
885 .build();
886 Self { cache }
887 }
888}
889
890impl CacheStore for IncrementalCompilationCache {
891 fn get(&self, key: &[u8]) -> Option<Cow<'_, [u8]>> {
892 self.cache.get(key).map(|v| v.into())
893 }
894
895 fn insert(&self, key: &[u8], value: Vec<u8>) -> bool {
896 self.cache.insert(key.to_vec(), value);
897 true
898 }
899}