1use gpui::{
2 PlatformDispatcher, Priority, PriorityQueueReceiver, PriorityQueueSender, RunnableVariant,
3 ThreadTaskTimings,
4};
5use std::sync::Arc;
6use std::sync::atomic::AtomicI32;
7use std::time::Duration;
8use wasm_bindgen::prelude::*;
9use web_time::Instant;
10
11#[cfg(feature = "multithreaded")]
12const MIN_BACKGROUND_THREADS: usize = 2;
13
14#[cfg(feature = "multithreaded")]
15fn shared_memory_supported() -> bool {
16 let global = js_sys::global();
17 let has_shared_array_buffer =
18 js_sys::Reflect::has(&global, &JsValue::from_str("SharedArrayBuffer")).unwrap_or(false);
19 let has_atomics = js_sys::Reflect::has(&global, &JsValue::from_str("Atomics")).unwrap_or(false);
20 let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
21 let buffer = memory.buffer();
22 let is_shared_buffer = buffer.is_instance_of::<js_sys::SharedArrayBuffer>();
23 has_shared_array_buffer && has_atomics && is_shared_buffer
24}
25
26enum MainThreadItem {
27 Runnable(RunnableVariant),
28 Delayed {
29 runnable: RunnableVariant,
30 millis: i32,
31 },
32 // TODO-Wasm: Shouldn't these run on their own dedicated thread?
33 RealtimeFunction(Box<dyn FnOnce() + Send>),
34}
35
36struct MainThreadMailbox {
37 sender: PriorityQueueSender<MainThreadItem>,
38 receiver: parking_lot::Mutex<PriorityQueueReceiver<MainThreadItem>>,
39 signal: AtomicI32,
40}
41
42impl MainThreadMailbox {
43 fn new() -> Self {
44 let (sender, receiver) = PriorityQueueReceiver::new();
45 Self {
46 sender,
47 receiver: parking_lot::Mutex::new(receiver),
48 signal: AtomicI32::new(0),
49 }
50 }
51
52 fn post(&self, priority: Priority, item: MainThreadItem) {
53 if self.sender.spin_send(priority, item).is_err() {
54 log::error!("MainThreadMailbox::send failed: receiver disconnected");
55 }
56
57 // TODO-Wasm: Verify this lock-free protocol
58 let view = self.signal_view();
59 js_sys::Atomics::store(&view, 0, 1).ok();
60 js_sys::Atomics::notify(&view, 0).ok();
61 }
62
63 fn drain(&self, window: &web_sys::Window) {
64 let mut receiver = self.receiver.lock();
65 loop {
66 // We need these `spin` variants because we can't acquire a lock on the main thread.
67 // TODO-WASM: Should we do something different?
68 match receiver.spin_try_pop() {
69 Ok(Some(item)) => execute_on_main_thread(window, item),
70 Ok(None) => break,
71 Err(_) => break,
72 }
73 }
74 }
75
76 fn signal_view(&self) -> js_sys::Int32Array {
77 let byte_offset = self.signal.as_ptr() as u32;
78 let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
79 js_sys::Int32Array::new_with_byte_offset_and_length(&memory.buffer(), byte_offset, 1)
80 }
81
82 fn run_waker_loop(self: &Arc<Self>, window: web_sys::Window) {
83 if !shared_memory_supported() {
84 log::warn!("SharedArrayBuffer not available; main thread mailbox waker loop disabled");
85 return;
86 }
87
88 let mailbox = Arc::clone(self);
89 wasm_bindgen_futures::spawn_local(async move {
90 let view = mailbox.signal_view();
91 loop {
92 js_sys::Atomics::store(&view, 0, 0).expect("Atomics.store failed");
93
94 let result = match js_sys::Atomics::wait_async(&view, 0, 0) {
95 Ok(result) => result,
96 Err(error) => {
97 log::error!("Atomics.waitAsync failed: {error:?}");
98 break;
99 }
100 };
101
102 let is_async = js_sys::Reflect::get(&result, &JsValue::from_str("async"))
103 .ok()
104 .and_then(|v| v.as_bool())
105 .unwrap_or(false);
106
107 if !is_async {
108 log::error!("Atomics.waitAsync returned synchronously; waker loop exiting");
109 break;
110 }
111
112 let promise: js_sys::Promise =
113 js_sys::Reflect::get(&result, &JsValue::from_str("value"))
114 .expect("waitAsync result missing 'value'")
115 .unchecked_into();
116
117 let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
118
119 mailbox.drain(&window);
120 }
121 });
122 }
123}
124
125pub struct WebDispatcher {
126 main_thread_id: std::thread::ThreadId,
127 browser_window: web_sys::Window,
128 background_sender: PriorityQueueSender<RunnableVariant>,
129 main_thread_mailbox: Arc<MainThreadMailbox>,
130 supports_threads: bool,
131 #[cfg(feature = "multithreaded")]
132 _background_threads: Vec<wasm_thread::JoinHandle<()>>,
133}
134
135// Safety: `web_sys::Window` is only accessed from the main thread
136// All other fields are `Send + Sync` by construction.
137unsafe impl Send for WebDispatcher {}
138unsafe impl Sync for WebDispatcher {}
139
140impl WebDispatcher {
141 pub fn new(browser_window: web_sys::Window, allow_threads: bool) -> Self {
142 #[cfg(feature = "multithreaded")]
143 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
144 #[cfg(not(feature = "multithreaded"))]
145 let (background_sender, _) = PriorityQueueReceiver::new();
146
147 let main_thread_mailbox = Arc::new(MainThreadMailbox::new());
148
149 #[cfg(feature = "multithreaded")]
150 let supports_threads = allow_threads && shared_memory_supported();
151 #[cfg(not(feature = "multithreaded"))]
152 let supports_threads = false;
153
154 if supports_threads {
155 main_thread_mailbox.run_waker_loop(browser_window.clone());
156 } else {
157 log::warn!(
158 "SharedArrayBuffer not available; falling back to single-threaded dispatcher"
159 );
160 }
161
162 #[cfg(feature = "multithreaded")]
163 let background_threads = if supports_threads {
164 let thread_count = browser_window
165 .navigator()
166 .hardware_concurrency()
167 .max(MIN_BACKGROUND_THREADS as f64) as usize;
168
169 // TODO-Wasm: Is it bad to have web workers blocking for a long time like this?
170 (0..thread_count)
171 .map(|i| {
172 let mut receiver = background_receiver.clone();
173 wasm_thread::Builder::new()
174 .name(format!("background-worker-{i}"))
175 .spawn(move || {
176 loop {
177 let runnable: RunnableVariant = match receiver.pop() {
178 Ok(runnable) => runnable,
179 Err(_) => {
180 log::info!(
181 "background-worker-{i}: channel disconnected, exiting"
182 );
183 break;
184 }
185 };
186
187 runnable.run();
188 }
189 })
190 .expect("failed to spawn background worker thread")
191 })
192 .collect::<Vec<_>>()
193 } else {
194 Vec::new()
195 };
196
197 Self {
198 main_thread_id: std::thread::current().id(),
199 browser_window,
200 background_sender,
201 main_thread_mailbox,
202 supports_threads,
203 #[cfg(feature = "multithreaded")]
204 _background_threads: background_threads,
205 }
206 }
207
208 fn on_main_thread(&self) -> bool {
209 std::thread::current().id() == self.main_thread_id
210 }
211}
212
213impl PlatformDispatcher for WebDispatcher {
214 fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
215 // TODO-Wasm: should we panic here?
216 Vec::new()
217 }
218
219 fn get_current_thread_timings(&self) -> ThreadTaskTimings {
220 ThreadTaskTimings {
221 thread_name: None,
222 thread_id: std::thread::current().id(),
223 timings: Vec::new(),
224 total_pushed: 0,
225 }
226 }
227
228 fn is_main_thread(&self) -> bool {
229 self.on_main_thread()
230 }
231
232 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
233 if !self.supports_threads {
234 self.dispatch_on_main_thread(runnable, priority);
235 return;
236 }
237
238 let result = if self.on_main_thread() {
239 self.background_sender.spin_send(priority, runnable)
240 } else {
241 self.background_sender.send(priority, runnable)
242 };
243
244 if let Err(error) = result {
245 log::error!("dispatch: failed to send to background queue: {error:?}");
246 }
247 }
248
249 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
250 if self.on_main_thread() {
251 schedule_runnable(&self.browser_window, runnable, priority);
252 } else {
253 self.main_thread_mailbox
254 .post(priority, MainThreadItem::Runnable(runnable));
255 }
256 }
257
258 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
259 let millis = duration.as_millis().min(i32::MAX as u128) as i32;
260 if self.on_main_thread() {
261 let callback = Closure::once_into_js(move || {
262 runnable.run();
263 });
264 self.browser_window
265 .set_timeout_with_callback_and_timeout_and_arguments_0(
266 callback.unchecked_ref(),
267 millis,
268 )
269 .ok();
270 } else {
271 self.main_thread_mailbox
272 .post(Priority::High, MainThreadItem::Delayed { runnable, millis });
273 }
274 }
275
276 fn spawn_realtime(&self, function: Box<dyn FnOnce() + Send>) {
277 if self.on_main_thread() {
278 let callback = Closure::once_into_js(move || {
279 function();
280 });
281 self.browser_window
282 .queue_microtask(callback.unchecked_ref());
283 } else {
284 self.main_thread_mailbox
285 .post(Priority::High, MainThreadItem::RealtimeFunction(function));
286 }
287 }
288
289 fn now(&self) -> Instant {
290 Instant::now()
291 }
292}
293
294fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
295 match item {
296 MainThreadItem::Runnable(runnable) => {
297 runnable.run();
298 }
299 MainThreadItem::Delayed { runnable, millis } => {
300 let callback = Closure::once_into_js(move || {
301 runnable.run();
302 });
303 window
304 .set_timeout_with_callback_and_timeout_and_arguments_0(
305 callback.unchecked_ref(),
306 millis,
307 )
308 .ok();
309 }
310 MainThreadItem::RealtimeFunction(function) => {
311 function();
312 }
313 }
314}
315
316fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
317 let callback = Closure::once_into_js(move || {
318 runnable.run();
319 });
320 let callback: &js_sys::Function = callback.unchecked_ref();
321
322 match priority {
323 Priority::RealtimeAudio => {
324 window.queue_microtask(callback);
325 }
326 _ => {
327 // TODO-Wasm: this ought to enqueue so we can dequeue with proper priority
328 window
329 .set_timeout_with_callback_and_timeout_and_arguments_0(callback, 0)
330 .ok();
331 }
332 }
333}