1pub mod wit;
2
3use crate::capability_granter::CapabilityGranter;
4use crate::{ExtensionManifest, ExtensionSettings};
5use anyhow::{Context as _, Result, anyhow, bail};
6use async_trait::async_trait;
7use dap::{DebugRequest, StartDebuggingRequestArgumentsRequest};
8use extension::{
9 CodeLabel, Command, Completion, ContextServerConfiguration, DebugAdapterBinary,
10 DebugTaskDefinition, ExtensionCapability, ExtensionHostProxy, KeyValueStoreDelegate,
11 ProjectDelegate, SlashCommand, SlashCommandArgumentCompletion, SlashCommandOutput, Symbol,
12 WorktreeDelegate,
13};
14use fs::{Fs, normalize_path};
15use futures::AsyncReadExt;
16use futures::future::LocalBoxFuture;
17use futures::{
18 Future, FutureExt, StreamExt as _,
19 channel::{
20 mpsc::{self, UnboundedSender},
21 oneshot,
22 },
23 future::BoxFuture,
24};
25use gpui::{App, AsyncApp, BackgroundExecutor, Task, Timer};
26use http_client::HttpClient;
27use language::LanguageName;
28use lsp::LanguageServerName;
29use moka::sync::Cache;
30use node_runtime::NodeRuntime;
31use release_channel::ReleaseChannel;
32use semver::Version;
33use settings::Settings;
34use std::{
35 borrow::Cow,
36 path::{Path, PathBuf},
37 sync::{
38 Arc, LazyLock, OnceLock,
39 atomic::{AtomicBool, Ordering},
40 },
41 time::Duration,
42};
43use task::{DebugScenario, SpawnInTerminal, TaskTemplate, ZedDebugConfig};
44use util::paths::SanitizedPath;
45use wasmtime::{
46 CacheStore, Engine, Store,
47 component::{Component, ResourceTable},
48};
49use wasmtime_wasi::p2::{self as wasi, IoView as _};
50use wit::Extension;
51
52pub struct WasmHost {
53 engine: Engine,
54 release_channel: ReleaseChannel,
55 http_client: Arc<dyn HttpClient>,
56 node_runtime: NodeRuntime,
57 pub(crate) proxy: Arc<ExtensionHostProxy>,
58 fs: Arc<dyn Fs>,
59 pub work_dir: PathBuf,
60 /// The capabilities granted to extensions running on the host.
61 pub(crate) granted_capabilities: Vec<ExtensionCapability>,
62 _main_thread_message_task: Task<()>,
63 main_thread_message_tx: mpsc::UnboundedSender<MainThreadCall>,
64}
65
66#[derive(Clone, Debug)]
67pub struct WasmExtension {
68 tx: UnboundedSender<ExtensionCall>,
69 pub manifest: Arc<ExtensionManifest>,
70 pub work_dir: Arc<Path>,
71 #[allow(unused)]
72 pub zed_api_version: Version,
73 _task: Arc<Task<Result<(), gpui_tokio::JoinError>>>,
74}
75
76impl Drop for WasmExtension {
77 fn drop(&mut self) {
78 self.tx.close_channel();
79 }
80}
81
82#[async_trait]
83impl extension::Extension for WasmExtension {
84 fn manifest(&self) -> Arc<ExtensionManifest> {
85 self.manifest.clone()
86 }
87
88 fn work_dir(&self) -> Arc<Path> {
89 self.work_dir.clone()
90 }
91
92 async fn language_server_command(
93 &self,
94 language_server_id: LanguageServerName,
95 language_name: LanguageName,
96 worktree: Arc<dyn WorktreeDelegate>,
97 ) -> Result<Command> {
98 self.call(|extension, store| {
99 async move {
100 let resource = store.data_mut().table().push(worktree)?;
101 let command = extension
102 .call_language_server_command(
103 store,
104 &language_server_id,
105 &language_name,
106 resource,
107 )
108 .await?
109 .map_err(|err| store.data().extension_error(err))?;
110
111 Ok(command.into())
112 }
113 .boxed()
114 })
115 .await?
116 }
117
118 async fn language_server_initialization_options(
119 &self,
120 language_server_id: LanguageServerName,
121 language_name: LanguageName,
122 worktree: Arc<dyn WorktreeDelegate>,
123 ) -> Result<Option<String>> {
124 self.call(|extension, store| {
125 async move {
126 let resource = store.data_mut().table().push(worktree)?;
127 let options = extension
128 .call_language_server_initialization_options(
129 store,
130 &language_server_id,
131 &language_name,
132 resource,
133 )
134 .await?
135 .map_err(|err| store.data().extension_error(err))?;
136 anyhow::Ok(options)
137 }
138 .boxed()
139 })
140 .await?
141 }
142
143 async fn language_server_workspace_configuration(
144 &self,
145 language_server_id: LanguageServerName,
146 worktree: Arc<dyn WorktreeDelegate>,
147 ) -> Result<Option<String>> {
148 self.call(|extension, store| {
149 async move {
150 let resource = store.data_mut().table().push(worktree)?;
151 let options = extension
152 .call_language_server_workspace_configuration(
153 store,
154 &language_server_id,
155 resource,
156 )
157 .await?
158 .map_err(|err| store.data().extension_error(err))?;
159 anyhow::Ok(options)
160 }
161 .boxed()
162 })
163 .await?
164 }
165
166 async fn language_server_additional_initialization_options(
167 &self,
168 language_server_id: LanguageServerName,
169 target_language_server_id: LanguageServerName,
170 worktree: Arc<dyn WorktreeDelegate>,
171 ) -> Result<Option<String>> {
172 self.call(|extension, store| {
173 async move {
174 let resource = store.data_mut().table().push(worktree)?;
175 let options = extension
176 .call_language_server_additional_initialization_options(
177 store,
178 &language_server_id,
179 &target_language_server_id,
180 resource,
181 )
182 .await?
183 .map_err(|err| store.data().extension_error(err))?;
184 anyhow::Ok(options)
185 }
186 .boxed()
187 })
188 .await?
189 }
190
191 async fn language_server_additional_workspace_configuration(
192 &self,
193 language_server_id: LanguageServerName,
194 target_language_server_id: LanguageServerName,
195 worktree: Arc<dyn WorktreeDelegate>,
196 ) -> Result<Option<String>> {
197 self.call(|extension, store| {
198 async move {
199 let resource = store.data_mut().table().push(worktree)?;
200 let options = extension
201 .call_language_server_additional_workspace_configuration(
202 store,
203 &language_server_id,
204 &target_language_server_id,
205 resource,
206 )
207 .await?
208 .map_err(|err| store.data().extension_error(err))?;
209 anyhow::Ok(options)
210 }
211 .boxed()
212 })
213 .await?
214 }
215
216 async fn labels_for_completions(
217 &self,
218 language_server_id: LanguageServerName,
219 completions: Vec<Completion>,
220 ) -> Result<Vec<Option<CodeLabel>>> {
221 self.call(|extension, store| {
222 async move {
223 let labels = extension
224 .call_labels_for_completions(
225 store,
226 &language_server_id,
227 completions.into_iter().map(Into::into).collect(),
228 )
229 .await?
230 .map_err(|err| store.data().extension_error(err))?;
231
232 Ok(labels
233 .into_iter()
234 .map(|label| label.map(Into::into))
235 .collect())
236 }
237 .boxed()
238 })
239 .await?
240 }
241
242 async fn labels_for_symbols(
243 &self,
244 language_server_id: LanguageServerName,
245 symbols: Vec<Symbol>,
246 ) -> Result<Vec<Option<CodeLabel>>> {
247 self.call(|extension, store| {
248 async move {
249 let labels = extension
250 .call_labels_for_symbols(
251 store,
252 &language_server_id,
253 symbols.into_iter().map(Into::into).collect(),
254 )
255 .await?
256 .map_err(|err| store.data().extension_error(err))?;
257
258 Ok(labels
259 .into_iter()
260 .map(|label| label.map(Into::into))
261 .collect())
262 }
263 .boxed()
264 })
265 .await?
266 }
267
268 async fn complete_slash_command_argument(
269 &self,
270 command: SlashCommand,
271 arguments: Vec<String>,
272 ) -> Result<Vec<SlashCommandArgumentCompletion>> {
273 self.call(|extension, store| {
274 async move {
275 let completions = extension
276 .call_complete_slash_command_argument(store, &command.into(), &arguments)
277 .await?
278 .map_err(|err| store.data().extension_error(err))?;
279
280 Ok(completions.into_iter().map(Into::into).collect())
281 }
282 .boxed()
283 })
284 .await?
285 }
286
287 async fn run_slash_command(
288 &self,
289 command: SlashCommand,
290 arguments: Vec<String>,
291 delegate: Option<Arc<dyn WorktreeDelegate>>,
292 ) -> Result<SlashCommandOutput> {
293 self.call(|extension, store| {
294 async move {
295 let resource = if let Some(delegate) = delegate {
296 Some(store.data_mut().table().push(delegate)?)
297 } else {
298 None
299 };
300
301 let output = extension
302 .call_run_slash_command(store, &command.into(), &arguments, resource)
303 .await?
304 .map_err(|err| store.data().extension_error(err))?;
305
306 Ok(output.into())
307 }
308 .boxed()
309 })
310 .await?
311 }
312
313 async fn context_server_command(
314 &self,
315 context_server_id: Arc<str>,
316 project: Arc<dyn ProjectDelegate>,
317 ) -> Result<Command> {
318 self.call(|extension, store| {
319 async move {
320 let project_resource = store.data_mut().table().push(project)?;
321 let command = extension
322 .call_context_server_command(store, context_server_id.clone(), project_resource)
323 .await?
324 .map_err(|err| store.data().extension_error(err))?;
325 anyhow::Ok(command.into())
326 }
327 .boxed()
328 })
329 .await?
330 }
331
332 async fn context_server_configuration(
333 &self,
334 context_server_id: Arc<str>,
335 project: Arc<dyn ProjectDelegate>,
336 ) -> Result<Option<ContextServerConfiguration>> {
337 self.call(|extension, store| {
338 async move {
339 let project_resource = store.data_mut().table().push(project)?;
340 let Some(configuration) = extension
341 .call_context_server_configuration(
342 store,
343 context_server_id.clone(),
344 project_resource,
345 )
346 .await?
347 .map_err(|err| store.data().extension_error(err))?
348 else {
349 return Ok(None);
350 };
351
352 Ok(Some(configuration.try_into()?))
353 }
354 .boxed()
355 })
356 .await?
357 }
358
359 async fn suggest_docs_packages(&self, provider: Arc<str>) -> Result<Vec<String>> {
360 self.call(|extension, store| {
361 async move {
362 let packages = extension
363 .call_suggest_docs_packages(store, provider.as_ref())
364 .await?
365 .map_err(|err| store.data().extension_error(err))?;
366
367 Ok(packages)
368 }
369 .boxed()
370 })
371 .await?
372 }
373
374 async fn index_docs(
375 &self,
376 provider: Arc<str>,
377 package_name: Arc<str>,
378 kv_store: Arc<dyn KeyValueStoreDelegate>,
379 ) -> Result<()> {
380 self.call(|extension, store| {
381 async move {
382 let kv_store_resource = store.data_mut().table().push(kv_store)?;
383 extension
384 .call_index_docs(
385 store,
386 provider.as_ref(),
387 package_name.as_ref(),
388 kv_store_resource,
389 )
390 .await?
391 .map_err(|err| store.data().extension_error(err))?;
392
393 anyhow::Ok(())
394 }
395 .boxed()
396 })
397 .await?
398 }
399
400 async fn get_dap_binary(
401 &self,
402 dap_name: Arc<str>,
403 config: DebugTaskDefinition,
404 user_installed_path: Option<PathBuf>,
405 worktree: Arc<dyn WorktreeDelegate>,
406 ) -> Result<DebugAdapterBinary> {
407 self.call(|extension, store| {
408 async move {
409 let resource = store.data_mut().table().push(worktree)?;
410 let dap_binary = extension
411 .call_get_dap_binary(store, dap_name, config, user_installed_path, resource)
412 .await?
413 .map_err(|err| store.data().extension_error(err))?;
414 let dap_binary = dap_binary.try_into()?;
415 Ok(dap_binary)
416 }
417 .boxed()
418 })
419 .await?
420 }
421 async fn dap_request_kind(
422 &self,
423 dap_name: Arc<str>,
424 config: serde_json::Value,
425 ) -> Result<StartDebuggingRequestArgumentsRequest> {
426 self.call(|extension, store| {
427 async move {
428 let kind = extension
429 .call_dap_request_kind(store, dap_name, config)
430 .await?
431 .map_err(|err| store.data().extension_error(err))?;
432 Ok(kind.into())
433 }
434 .boxed()
435 })
436 .await?
437 }
438
439 async fn dap_config_to_scenario(&self, config: ZedDebugConfig) -> Result<DebugScenario> {
440 self.call(|extension, store| {
441 async move {
442 let kind = extension
443 .call_dap_config_to_scenario(store, config)
444 .await?
445 .map_err(|err| store.data().extension_error(err))?;
446 Ok(kind)
447 }
448 .boxed()
449 })
450 .await?
451 }
452
453 async fn dap_locator_create_scenario(
454 &self,
455 locator_name: String,
456 build_config_template: TaskTemplate,
457 resolved_label: String,
458 debug_adapter_name: String,
459 ) -> Result<Option<DebugScenario>> {
460 self.call(|extension, store| {
461 async move {
462 extension
463 .call_dap_locator_create_scenario(
464 store,
465 locator_name,
466 build_config_template,
467 resolved_label,
468 debug_adapter_name,
469 )
470 .await
471 }
472 .boxed()
473 })
474 .await?
475 }
476 async fn run_dap_locator(
477 &self,
478 locator_name: String,
479 config: SpawnInTerminal,
480 ) -> Result<DebugRequest> {
481 self.call(|extension, store| {
482 async move {
483 extension
484 .call_run_dap_locator(store, locator_name, config)
485 .await?
486 .map_err(|err| store.data().extension_error(err))
487 }
488 .boxed()
489 })
490 .await?
491 }
492}
493
494pub struct WasmState {
495 manifest: Arc<ExtensionManifest>,
496 pub table: ResourceTable,
497 ctx: wasi::WasiCtx,
498 pub host: Arc<WasmHost>,
499 pub(crate) capability_granter: CapabilityGranter,
500}
501
502std::thread_local! {
503 /// Used by the crash handler to ignore panics in extension-related threads.
504 pub static IS_WASM_THREAD: AtomicBool = const { AtomicBool::new(false) };
505}
506
507type MainThreadCall = Box<dyn Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, ()>>;
508
509type ExtensionCall = Box<
510 dyn Send + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, ()>,
511>;
512
513fn wasm_engine(executor: &BackgroundExecutor) -> wasmtime::Engine {
514 static WASM_ENGINE: OnceLock<wasmtime::Engine> = OnceLock::new();
515 WASM_ENGINE
516 .get_or_init(|| {
517 let mut config = wasmtime::Config::new();
518 config.wasm_component_model(true);
519 config.async_support(true);
520 config
521 .enable_incremental_compilation(cache_store())
522 .unwrap();
523 // Async support introduces the issue that extension execution happens during `Future::poll`,
524 // which could block an async thread.
525 // https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#execution-in-poll
526 //
527 // Epoch interruption is a lightweight mechanism to allow the extensions to yield control
528 // back to the executor at regular intervals.
529 config.epoch_interruption(true);
530
531 let engine = wasmtime::Engine::new(&config).unwrap();
532
533 // It might be safer to do this on a non-async thread to make sure it makes progress
534 // regardless of if extensions are blocking.
535 // However, due to our current setup, this isn't a likely occurrence and we'd rather
536 // not have a dedicated thread just for this. If it becomes an issue, we can consider
537 // creating a separate thread for epoch interruption.
538 let engine_ref = engine.weak();
539 executor
540 .spawn(async move {
541 // Somewhat arbitrary interval, as it isn't a guaranteed interval.
542 // But this is a rough upper bound for how long the extension execution can block on
543 // `Future::poll`.
544 const EPOCH_INTERVAL: Duration = Duration::from_millis(100);
545 let mut timer = Timer::interval(EPOCH_INTERVAL);
546 while (timer.next().await).is_some() {
547 // Exit the loop and thread once the engine is dropped.
548 let Some(engine) = engine_ref.upgrade() else {
549 break;
550 };
551 engine.increment_epoch();
552 }
553 })
554 .detach();
555
556 engine
557 })
558 .clone()
559}
560
561fn cache_store() -> Arc<IncrementalCompilationCache> {
562 static CACHE_STORE: LazyLock<Arc<IncrementalCompilationCache>> =
563 LazyLock::new(|| Arc::new(IncrementalCompilationCache::new()));
564 CACHE_STORE.clone()
565}
566
567impl WasmHost {
568 pub fn new(
569 fs: Arc<dyn Fs>,
570 http_client: Arc<dyn HttpClient>,
571 node_runtime: NodeRuntime,
572 proxy: Arc<ExtensionHostProxy>,
573 work_dir: PathBuf,
574 cx: &mut App,
575 ) -> Arc<Self> {
576 let (tx, mut rx) = mpsc::unbounded::<MainThreadCall>();
577 let task = cx.spawn(async move |cx| {
578 while let Some(message) = rx.next().await {
579 message(cx).await;
580 }
581 });
582
583 let extension_settings = ExtensionSettings::get_global(cx);
584
585 Arc::new(Self {
586 engine: wasm_engine(cx.background_executor()),
587 fs,
588 work_dir,
589 http_client,
590 node_runtime,
591 proxy,
592 release_channel: ReleaseChannel::global(cx),
593 granted_capabilities: extension_settings.granted_capabilities.clone(),
594 _main_thread_message_task: task,
595 main_thread_message_tx: tx,
596 })
597 }
598
599 pub fn load_extension(
600 self: &Arc<Self>,
601 wasm_bytes: Vec<u8>,
602 manifest: &Arc<ExtensionManifest>,
603 cx: &AsyncApp,
604 ) -> Task<Result<WasmExtension>> {
605 let this = self.clone();
606 let manifest = manifest.clone();
607 let executor = cx.background_executor().clone();
608 let load_extension_task = async move {
609 let zed_api_version = parse_wasm_extension_version(&manifest.id, &wasm_bytes)?;
610
611 let component = Component::from_binary(&this.engine, &wasm_bytes)
612 .context("failed to compile wasm component")?;
613 let mut store = wasmtime::Store::new(
614 &this.engine,
615 WasmState {
616 ctx: this.build_wasi_ctx(&manifest).await?,
617 manifest: manifest.clone(),
618 table: ResourceTable::new(),
619 host: this.clone(),
620 capability_granter: CapabilityGranter::new(
621 this.granted_capabilities.clone(),
622 manifest.clone(),
623 ),
624 },
625 );
626 // Store will yield after 1 tick, and get a new deadline of 1 tick after each yield.
627 store.set_epoch_deadline(1);
628 store.epoch_deadline_async_yield_and_update(1);
629
630 let mut extension = Extension::instantiate_async(
631 &executor,
632 &mut store,
633 this.release_channel,
634 zed_api_version.clone(),
635 &component,
636 )
637 .await?;
638
639 extension
640 .call_init_extension(&mut store)
641 .await
642 .context("failed to initialize wasm extension")?;
643
644 let (tx, mut rx) = mpsc::unbounded::<ExtensionCall>();
645 let extension_task = async move {
646 // note: Setting the thread local here will slowly "poison" all tokio threads
647 // causing us to not record their panics any longer.
648 //
649 // This is fine though, the main zed binary only uses tokio for livekit and wasm extensions.
650 // Livekit seldom (if ever) panics 🤞 so the likelihood of us missing a panic in sentry is very low.
651 IS_WASM_THREAD.with(|v| v.store(true, Ordering::Release));
652 while let Some(call) = rx.next().await {
653 (call)(&mut extension, &mut store).await;
654 }
655 };
656
657 anyhow::Ok((
658 extension_task,
659 manifest.clone(),
660 this.work_dir.join(manifest.id.as_ref()).into(),
661 tx,
662 zed_api_version,
663 ))
664 };
665 cx.spawn(async move |cx| {
666 let (extension_task, manifest, work_dir, tx, zed_api_version) =
667 cx.background_executor().spawn(load_extension_task).await?;
668 // we need to run run the task in a tokio context as wasmtime_wasi may
669 // call into tokio, accessing its runtime handle when we trigger the `engine.increment_epoch()` above.
670 let task = Arc::new(gpui_tokio::Tokio::spawn(cx, extension_task)?);
671
672 Ok(WasmExtension {
673 manifest,
674 work_dir,
675 tx,
676 zed_api_version,
677 _task: task,
678 })
679 })
680 }
681
682 async fn build_wasi_ctx(&self, manifest: &Arc<ExtensionManifest>) -> Result<wasi::WasiCtx> {
683 let extension_work_dir = self.work_dir.join(manifest.id.as_ref());
684 self.fs
685 .create_dir(&extension_work_dir)
686 .await
687 .context("failed to create extension work dir")?;
688
689 let file_perms = wasmtime_wasi::FilePerms::all();
690 let dir_perms = wasmtime_wasi::DirPerms::all();
691 let path = SanitizedPath::new(&extension_work_dir).to_string();
692 #[cfg(target_os = "windows")]
693 let path = path.replace('\\', "/");
694
695 let mut ctx = wasi::WasiCtxBuilder::new();
696 ctx.inherit_stdio()
697 .env("PWD", &path)
698 .env("RUST_BACKTRACE", "full");
699
700 ctx.preopened_dir(&path, ".", dir_perms, file_perms)?;
701 ctx.preopened_dir(&path, &path, dir_perms, file_perms)?;
702
703 Ok(ctx.build())
704 }
705
706 pub fn writeable_path_from_extension(&self, id: &Arc<str>, path: &Path) -> Result<PathBuf> {
707 let extension_work_dir = self.work_dir.join(id.as_ref());
708 let path = normalize_path(&extension_work_dir.join(path));
709 anyhow::ensure!(
710 path.starts_with(&extension_work_dir),
711 "cannot write to path {path:?}",
712 );
713 Ok(path)
714 }
715}
716
717pub fn parse_wasm_extension_version(extension_id: &str, wasm_bytes: &[u8]) -> Result<Version> {
718 let mut version = None;
719
720 for part in wasmparser::Parser::new(0).parse_all(wasm_bytes) {
721 if let wasmparser::Payload::CustomSection(s) =
722 part.context("error parsing wasm extension")?
723 && s.name() == "zed:api-version"
724 {
725 version = parse_wasm_extension_version_custom_section(s.data());
726 if version.is_none() {
727 bail!(
728 "extension {} has invalid zed:api-version section: {:?}",
729 extension_id,
730 s.data()
731 );
732 }
733 }
734 }
735
736 // The reason we wait until we're done parsing all of the Wasm bytes to return the version
737 // is to work around a panic that can happen inside of Wasmtime when the bytes are invalid.
738 //
739 // By parsing the entirety of the Wasm bytes before we return, we're able to detect this problem
740 // earlier as an `Err` rather than as a panic.
741 version.with_context(|| format!("extension {extension_id} has no zed:api-version section"))
742}
743
744fn parse_wasm_extension_version_custom_section(data: &[u8]) -> Option<Version> {
745 if data.len() == 6 {
746 Some(Version::new(
747 u16::from_be_bytes([data[0], data[1]]) as _,
748 u16::from_be_bytes([data[2], data[3]]) as _,
749 u16::from_be_bytes([data[4], data[5]]) as _,
750 ))
751 } else {
752 None
753 }
754}
755
756impl WasmExtension {
757 pub async fn load(
758 extension_dir: &Path,
759 manifest: &Arc<ExtensionManifest>,
760 wasm_host: Arc<WasmHost>,
761 cx: &AsyncApp,
762 ) -> Result<Self> {
763 let path = extension_dir.join("extension.wasm");
764
765 let mut wasm_file = wasm_host
766 .fs
767 .open_read(&path)
768 .await
769 .context(format!("opening wasm file, path: {path:?}"))?;
770
771 let mut wasm_bytes = Vec::new();
772 wasm_file
773 .read_to_end(&mut wasm_bytes)
774 .await
775 .context(format!("reading wasm file, path: {path:?}"))?;
776
777 wasm_host
778 .load_extension(wasm_bytes, manifest, cx)
779 .await
780 .with_context(|| format!("loading wasm extension: {}", manifest.id))
781 }
782
783 pub async fn call<T, Fn>(&self, f: Fn) -> Result<T>
784 where
785 T: 'static + Send,
786 Fn: 'static
787 + Send
788 + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, T>,
789 {
790 let (return_tx, return_rx) = oneshot::channel();
791 self.tx
792 .unbounded_send(Box::new(move |extension, store| {
793 async {
794 let result = f(extension, store).await;
795 return_tx.send(result).ok();
796 }
797 .boxed()
798 }))
799 .map_err(|_| {
800 anyhow!(
801 "wasm extension channel should not be closed yet, extension {} (id {})",
802 self.manifest.name,
803 self.manifest.id,
804 )
805 })?;
806 return_rx.await.with_context(|| {
807 format!(
808 "wasm extension channel, extension {} (id {})",
809 self.manifest.name, self.manifest.id,
810 )
811 })
812 }
813}
814
815impl WasmState {
816 fn on_main_thread<T, Fn>(&self, f: Fn) -> impl 'static + Future<Output = T>
817 where
818 T: 'static + Send,
819 Fn: 'static + Send + for<'a> FnOnce(&'a mut AsyncApp) -> LocalBoxFuture<'a, T>,
820 {
821 let (return_tx, return_rx) = oneshot::channel();
822 self.host
823 .main_thread_message_tx
824 .clone()
825 .unbounded_send(Box::new(move |cx| {
826 async {
827 let result = f(cx).await;
828 return_tx.send(result).ok();
829 }
830 .boxed_local()
831 }))
832 .unwrap_or_else(|_| {
833 panic!(
834 "main thread message channel should not be closed yet, extension {} (id {})",
835 self.manifest.name, self.manifest.id,
836 )
837 });
838 let name = self.manifest.name.clone();
839 let id = self.manifest.id.clone();
840 async move {
841 return_rx.await.unwrap_or_else(|_| {
842 panic!("main thread message channel, extension {name} (id {id})")
843 })
844 }
845 }
846
847 fn work_dir(&self) -> PathBuf {
848 self.host.work_dir.join(self.manifest.id.as_ref())
849 }
850
851 fn extension_error(&self, message: String) -> anyhow::Error {
852 anyhow!(
853 "from extension \"{}\" version {}: {}",
854 self.manifest.name,
855 self.manifest.version,
856 message
857 )
858 }
859}
860
861impl wasi::IoView for WasmState {
862 fn table(&mut self) -> &mut ResourceTable {
863 &mut self.table
864 }
865}
866
867impl wasi::WasiView for WasmState {
868 fn ctx(&mut self) -> &mut wasi::WasiCtx {
869 &mut self.ctx
870 }
871}
872
873/// Wrapper around a mini-moka bounded cache for storing incremental compilation artifacts.
874/// Since wasm modules have many similar elements, this can save us a lot of work at the
875/// cost of a small memory footprint. However, we don't want this to be unbounded, so we use
876/// a LFU/LRU cache to evict less used cache entries.
877#[derive(Debug)]
878struct IncrementalCompilationCache {
879 cache: Cache<Vec<u8>, Vec<u8>>,
880}
881
882impl IncrementalCompilationCache {
883 fn new() -> Self {
884 let cache = Cache::builder()
885 // Cap this at 32 MB for now. Our extensions turn into roughly 512kb in the cache,
886 // which means we could store 64 completely novel extensions in the cache, but in
887 // practice we will more than that, which is more than enough for our use case.
888 .max_capacity(32 * 1024 * 1024)
889 .weigher(|k: &Vec<u8>, v: &Vec<u8>| (k.len() + v.len()).try_into().unwrap_or(u32::MAX))
890 .build();
891 Self { cache }
892 }
893}
894
895impl CacheStore for IncrementalCompilationCache {
896 fn get(&self, key: &[u8]) -> Option<Cow<'_, [u8]>> {
897 self.cache.get(key).map(|v| v.into())
898 }
899
900 fn insert(&self, key: &[u8], value: Vec<u8>) -> bool {
901 self.cache.insert(key.to_vec(), value);
902 true
903 }
904}