1use gpui::{
2 PlatformDispatcher, Priority, PriorityQueueReceiver, PriorityQueueSender, RunnableVariant,
3 ThreadTaskTimings,
4};
5use std::sync::Arc;
6use std::sync::atomic::AtomicI32;
7use std::time::Duration;
8use wasm_bindgen::prelude::*;
9use web_time::Instant;
10
11#[cfg(feature = "multithreaded")]
12const MIN_BACKGROUND_THREADS: usize = 2;
13
14#[cfg(feature = "multithreaded")]
15fn shared_memory_supported() -> bool {
16 let global = js_sys::global();
17 let has_shared_array_buffer =
18 js_sys::Reflect::has(&global, &JsValue::from_str("SharedArrayBuffer")).unwrap_or(false);
19 let has_atomics = js_sys::Reflect::has(&global, &JsValue::from_str("Atomics")).unwrap_or(false);
20 let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
21 let buffer = memory.buffer();
22 let is_shared_buffer = buffer.is_instance_of::<js_sys::SharedArrayBuffer>();
23 has_shared_array_buffer && has_atomics && is_shared_buffer
24}
25
26enum MainThreadItem {
27 Runnable(RunnableVariant),
28 Delayed {
29 runnable: RunnableVariant,
30 millis: i32,
31 },
32 // TODO-Wasm: Shouldn't these run on their own dedicated thread?
33 RealtimeFunction(Box<dyn FnOnce() + Send>),
34}
35
36struct MainThreadMailbox {
37 sender: PriorityQueueSender<MainThreadItem>,
38 receiver: parking_lot::Mutex<PriorityQueueReceiver<MainThreadItem>>,
39 signal: AtomicI32,
40}
41
42impl MainThreadMailbox {
43 fn new() -> Self {
44 let (sender, receiver) = PriorityQueueReceiver::new();
45 Self {
46 sender,
47 receiver: parking_lot::Mutex::new(receiver),
48 signal: AtomicI32::new(0),
49 }
50 }
51
52 fn post(&self, priority: Priority, item: MainThreadItem) {
53 if self.sender.spin_send(priority, item).is_err() {
54 log::error!("MainThreadMailbox::send failed: receiver disconnected");
55 }
56
57 // TODO-Wasm: Verify this lock-free protocol
58 let view = self.signal_view();
59 js_sys::Atomics::store(&view, 0, 1).ok();
60 js_sys::Atomics::notify(&view, 0).ok();
61 }
62
63 fn drain(&self, window: &web_sys::Window) {
64 let mut receiver = self.receiver.lock();
65 loop {
66 // We need these `spin` variants because we can't acquire a lock on the main thread.
67 // TODO-WASM: Should we do something different?
68 match receiver.spin_try_pop() {
69 Ok(Some(item)) => execute_on_main_thread(window, item),
70 Ok(None) => break,
71 Err(_) => break,
72 }
73 }
74 }
75
76 fn signal_view(&self) -> js_sys::Int32Array {
77 let byte_offset = self.signal.as_ptr() as u32;
78 let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
79 js_sys::Int32Array::new_with_byte_offset_and_length(&memory.buffer(), byte_offset, 1)
80 }
81
82 fn run_waker_loop(self: &Arc<Self>, window: web_sys::Window) {
83 if !shared_memory_supported() {
84 log::warn!("SharedArrayBuffer not available; main thread mailbox waker loop disabled");
85 return;
86 }
87
88 let mailbox = Arc::clone(self);
89 wasm_bindgen_futures::spawn_local(async move {
90 let view = mailbox.signal_view();
91 loop {
92 js_sys::Atomics::store(&view, 0, 0).expect("Atomics.store failed");
93
94 let result = match js_sys::Atomics::wait_async(&view, 0, 0) {
95 Ok(result) => result,
96 Err(error) => {
97 log::error!("Atomics.waitAsync failed: {error:?}");
98 break;
99 }
100 };
101
102 let is_async = js_sys::Reflect::get(&result, &JsValue::from_str("async"))
103 .ok()
104 .and_then(|v| v.as_bool())
105 .unwrap_or(false);
106
107 if !is_async {
108 log::error!("Atomics.waitAsync returned synchronously; waker loop exiting");
109 break;
110 }
111
112 let promise: js_sys::Promise =
113 js_sys::Reflect::get(&result, &JsValue::from_str("value"))
114 .expect("waitAsync result missing 'value'")
115 .unchecked_into();
116
117 let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
118
119 mailbox.drain(&window);
120 }
121 });
122 }
123}
124
125pub struct WebDispatcher {
126 main_thread_id: std::thread::ThreadId,
127 browser_window: web_sys::Window,
128 background_sender: PriorityQueueSender<RunnableVariant>,
129 main_thread_mailbox: Arc<MainThreadMailbox>,
130 supports_threads: bool,
131 #[cfg(feature = "multithreaded")]
132 _background_threads: Vec<wasm_thread::JoinHandle<()>>,
133}
134
135// Safety: `web_sys::Window` is only accessed from the main thread
136// All other fields are `Send + Sync` by construction.
137unsafe impl Send for WebDispatcher {}
138unsafe impl Sync for WebDispatcher {}
139
140impl WebDispatcher {
141 pub fn new(browser_window: web_sys::Window, allow_threads: bool) -> Self {
142 #[cfg(feature = "multithreaded")]
143 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
144 #[cfg(not(feature = "multithreaded"))]
145 let (background_sender, _) = PriorityQueueReceiver::new();
146
147 let main_thread_mailbox = Arc::new(MainThreadMailbox::new());
148
149 #[cfg(feature = "multithreaded")]
150 let supports_threads = allow_threads && shared_memory_supported();
151 #[cfg(not(feature = "multithreaded"))]
152 let supports_threads = false;
153
154 if supports_threads {
155 main_thread_mailbox.run_waker_loop(browser_window.clone());
156 } else {
157 log::warn!(
158 "SharedArrayBuffer not available; falling back to single-threaded dispatcher"
159 );
160 }
161
162 #[cfg(feature = "multithreaded")]
163 let background_threads = if supports_threads {
164 let thread_count = browser_window
165 .navigator()
166 .hardware_concurrency()
167 .max(MIN_BACKGROUND_THREADS as f64) as usize;
168
169 // TODO-Wasm: Is it bad to have web workers blocking for a long time like this?
170 (0..thread_count)
171 .map(|i| {
172 let mut receiver = background_receiver.clone();
173 wasm_thread::Builder::new()
174 .name(format!("background-worker-{i}"))
175 .spawn(move || {
176 loop {
177 let runnable: RunnableVariant = match receiver.pop() {
178 Ok(runnable) => runnable,
179 Err(_) => {
180 log::info!(
181 "background-worker-{i}: channel disconnected, exiting"
182 );
183 break;
184 }
185 };
186
187 if runnable.metadata().is_closed() {
188 continue;
189 }
190
191 runnable.run();
192 }
193 })
194 .expect("failed to spawn background worker thread")
195 })
196 .collect::<Vec<_>>()
197 } else {
198 Vec::new()
199 };
200
201 Self {
202 main_thread_id: std::thread::current().id(),
203 browser_window,
204 background_sender,
205 main_thread_mailbox,
206 supports_threads,
207 #[cfg(feature = "multithreaded")]
208 _background_threads: background_threads,
209 }
210 }
211
212 fn on_main_thread(&self) -> bool {
213 std::thread::current().id() == self.main_thread_id
214 }
215}
216
217impl PlatformDispatcher for WebDispatcher {
218 fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
219 // TODO-Wasm: should we panic here?
220 Vec::new()
221 }
222
223 fn get_current_thread_timings(&self) -> ThreadTaskTimings {
224 ThreadTaskTimings {
225 thread_name: None,
226 thread_id: std::thread::current().id(),
227 timings: Vec::new(),
228 total_pushed: 0,
229 }
230 }
231
232 fn is_main_thread(&self) -> bool {
233 self.on_main_thread()
234 }
235
236 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
237 if !self.supports_threads {
238 self.dispatch_on_main_thread(runnable, priority);
239 return;
240 }
241
242 let result = if self.on_main_thread() {
243 self.background_sender.spin_send(priority, runnable)
244 } else {
245 self.background_sender.send(priority, runnable)
246 };
247
248 if let Err(error) = result {
249 log::error!("dispatch: failed to send to background queue: {error:?}");
250 }
251 }
252
253 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
254 if self.on_main_thread() {
255 schedule_runnable(&self.browser_window, runnable, priority);
256 } else {
257 self.main_thread_mailbox
258 .post(priority, MainThreadItem::Runnable(runnable));
259 }
260 }
261
262 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
263 let millis = duration.as_millis().min(i32::MAX as u128) as i32;
264 if self.on_main_thread() {
265 let callback = Closure::once_into_js(move || {
266 if !runnable.metadata().is_closed() {
267 runnable.run();
268 }
269 });
270 self.browser_window
271 .set_timeout_with_callback_and_timeout_and_arguments_0(
272 callback.unchecked_ref(),
273 millis,
274 )
275 .ok();
276 } else {
277 self.main_thread_mailbox
278 .post(Priority::High, MainThreadItem::Delayed { runnable, millis });
279 }
280 }
281
282 fn spawn_realtime(&self, function: Box<dyn FnOnce() + Send>) {
283 if self.on_main_thread() {
284 let callback = Closure::once_into_js(move || {
285 function();
286 });
287 self.browser_window
288 .queue_microtask(callback.unchecked_ref());
289 } else {
290 self.main_thread_mailbox
291 .post(Priority::High, MainThreadItem::RealtimeFunction(function));
292 }
293 }
294
295 fn now(&self) -> Instant {
296 Instant::now()
297 }
298}
299
300fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
301 match item {
302 MainThreadItem::Runnable(runnable) => {
303 if !runnable.metadata().is_closed() {
304 runnable.run();
305 }
306 }
307 MainThreadItem::Delayed { runnable, millis } => {
308 let callback = Closure::once_into_js(move || {
309 if !runnable.metadata().is_closed() {
310 runnable.run();
311 }
312 });
313 window
314 .set_timeout_with_callback_and_timeout_and_arguments_0(
315 callback.unchecked_ref(),
316 millis,
317 )
318 .ok();
319 }
320 MainThreadItem::RealtimeFunction(function) => {
321 function();
322 }
323 }
324}
325
326fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
327 let callback = Closure::once_into_js(move || {
328 if !runnable.metadata().is_closed() {
329 runnable.run();
330 }
331 });
332 let callback: &js_sys::Function = callback.unchecked_ref();
333
334 match priority {
335 Priority::RealtimeAudio => {
336 window.queue_microtask(callback);
337 }
338 _ => {
339 // TODO-Wasm: this ought to enqueue so we can dequeue with proper priority
340 window
341 .set_timeout_with_callback_and_timeout_and_arguments_0(callback, 0)
342 .ok();
343 }
344 }
345}