dispatcher.rs

  1use gpui::{
  2    PlatformDispatcher, Priority, PriorityQueueReceiver, PriorityQueueSender, RunnableVariant,
  3    ThreadTaskTimings,
  4};
  5use std::sync::Arc;
  6use std::sync::atomic::AtomicI32;
  7use std::time::Duration;
  8use wasm_bindgen::prelude::*;
  9use web_time::Instant;
 10
 11const MIN_BACKGROUND_THREADS: usize = 2;
 12
 13fn shared_memory_supported() -> bool {
 14    let global = js_sys::global();
 15    let has_shared_array_buffer =
 16        js_sys::Reflect::has(&global, &JsValue::from_str("SharedArrayBuffer")).unwrap_or(false);
 17    let has_atomics = js_sys::Reflect::has(&global, &JsValue::from_str("Atomics")).unwrap_or(false);
 18    let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
 19    let buffer = memory.buffer();
 20    let is_shared_buffer = buffer.is_instance_of::<js_sys::SharedArrayBuffer>();
 21    has_shared_array_buffer && has_atomics && is_shared_buffer
 22}
 23
 24enum MainThreadItem {
 25    Runnable(RunnableVariant),
 26    Delayed {
 27        runnable: RunnableVariant,
 28        millis: i32,
 29    },
 30    // TODO-Wasm: Shouldn't these run on their own dedicated thread?
 31    RealtimeFunction(Box<dyn FnOnce() + Send>),
 32}
 33
 34struct MainThreadMailbox {
 35    sender: PriorityQueueSender<MainThreadItem>,
 36    receiver: parking_lot::Mutex<PriorityQueueReceiver<MainThreadItem>>,
 37    signal: AtomicI32,
 38}
 39
 40impl MainThreadMailbox {
 41    fn new() -> Self {
 42        let (sender, receiver) = PriorityQueueReceiver::new();
 43        Self {
 44            sender,
 45            receiver: parking_lot::Mutex::new(receiver),
 46            signal: AtomicI32::new(0),
 47        }
 48    }
 49
 50    fn post(&self, priority: Priority, item: MainThreadItem) {
 51        if self.sender.spin_send(priority, item).is_err() {
 52            log::error!("MainThreadMailbox::send failed: receiver disconnected");
 53        }
 54
 55        // TODO-Wasm: Verify this lock-free protocol
 56        let view = self.signal_view();
 57        js_sys::Atomics::store(&view, 0, 1).ok();
 58        js_sys::Atomics::notify(&view, 0).ok();
 59    }
 60
 61    fn drain(&self, window: &web_sys::Window) {
 62        let mut receiver = self.receiver.lock();
 63        loop {
 64            // We need these `spin` variants because we can't acquire a lock on the main thread.
 65            // TODO-WASM: Should we do something different?
 66            match receiver.spin_try_pop() {
 67                Ok(Some(item)) => execute_on_main_thread(window, item),
 68                Ok(None) => break,
 69                Err(_) => break,
 70            }
 71        }
 72    }
 73
 74    fn signal_view(&self) -> js_sys::Int32Array {
 75        let byte_offset = self.signal.as_ptr() as u32;
 76        let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
 77        js_sys::Int32Array::new_with_byte_offset_and_length(&memory.buffer(), byte_offset, 1)
 78    }
 79
 80    fn run_waker_loop(self: &Arc<Self>, window: web_sys::Window) {
 81        if !shared_memory_supported() {
 82            log::warn!("SharedArrayBuffer not available; main thread mailbox waker loop disabled");
 83            return;
 84        }
 85
 86        let mailbox = Arc::clone(self);
 87        wasm_bindgen_futures::spawn_local(async move {
 88            let view = mailbox.signal_view();
 89            loop {
 90                js_sys::Atomics::store(&view, 0, 0).expect("Atomics.store failed");
 91
 92                let result = match js_sys::Atomics::wait_async(&view, 0, 0) {
 93                    Ok(result) => result,
 94                    Err(error) => {
 95                        log::error!("Atomics.waitAsync failed: {error:?}");
 96                        break;
 97                    }
 98                };
 99
100                let is_async = js_sys::Reflect::get(&result, &JsValue::from_str("async"))
101                    .ok()
102                    .and_then(|v| v.as_bool())
103                    .unwrap_or(false);
104
105                if !is_async {
106                    log::error!("Atomics.waitAsync returned synchronously; waker loop exiting");
107                    break;
108                }
109
110                let promise: js_sys::Promise =
111                    js_sys::Reflect::get(&result, &JsValue::from_str("value"))
112                        .expect("waitAsync result missing 'value'")
113                        .unchecked_into();
114
115                let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
116
117                mailbox.drain(&window);
118            }
119        });
120    }
121}
122
123pub struct WebDispatcher {
124    main_thread_id: std::thread::ThreadId,
125    browser_window: web_sys::Window,
126    background_sender: PriorityQueueSender<RunnableVariant>,
127    main_thread_mailbox: Arc<MainThreadMailbox>,
128    supports_threads: bool,
129    _background_threads: Vec<wasm_thread::JoinHandle<()>>,
130}
131
132// Safety: `web_sys::Window` is only accessed from the main thread
133// All other fields are `Send + Sync` by construction.
134unsafe impl Send for WebDispatcher {}
135unsafe impl Sync for WebDispatcher {}
136
137impl WebDispatcher {
138    pub fn new(browser_window: web_sys::Window) -> Self {
139        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
140
141        let main_thread_mailbox = Arc::new(MainThreadMailbox::new());
142        let supports_threads = shared_memory_supported();
143
144        if supports_threads {
145            main_thread_mailbox.run_waker_loop(browser_window.clone());
146        } else {
147            log::warn!(
148                "SharedArrayBuffer not available; falling back to single-threaded dispatcher"
149            );
150        }
151
152        let background_threads = if supports_threads {
153            let thread_count = browser_window
154                .navigator()
155                .hardware_concurrency()
156                .max(MIN_BACKGROUND_THREADS as f64) as usize;
157
158            // TODO-Wasm: Is it bad to have web workers blocking for a long time like this?
159            (0..thread_count)
160                .map(|i| {
161                    let mut receiver = background_receiver.clone();
162                    wasm_thread::Builder::new()
163                        .name(format!("background-worker-{i}"))
164                        .spawn(move || {
165                            loop {
166                                let runnable: RunnableVariant = match receiver.pop() {
167                                    Ok(runnable) => runnable,
168                                    Err(_) => {
169                                        log::info!(
170                                            "background-worker-{i}: channel disconnected, exiting"
171                                        );
172                                        break;
173                                    }
174                                };
175
176                                if runnable.metadata().is_closed() {
177                                    continue;
178                                }
179
180                                runnable.run();
181                            }
182                        })
183                        .expect("failed to spawn background worker thread")
184                })
185                .collect::<Vec<_>>()
186        } else {
187            Vec::new()
188        };
189
190        Self {
191            main_thread_id: std::thread::current().id(),
192            browser_window,
193            background_sender,
194            main_thread_mailbox,
195            supports_threads,
196            _background_threads: background_threads,
197        }
198    }
199
200    fn on_main_thread(&self) -> bool {
201        std::thread::current().id() == self.main_thread_id
202    }
203}
204
205impl PlatformDispatcher for WebDispatcher {
206    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
207        // TODO-Wasm: should we panic here?
208        Vec::new()
209    }
210
211    fn get_current_thread_timings(&self) -> ThreadTaskTimings {
212        ThreadTaskTimings {
213            thread_name: None,
214            thread_id: std::thread::current().id(),
215            timings: Vec::new(),
216            total_pushed: 0,
217        }
218    }
219
220    fn is_main_thread(&self) -> bool {
221        self.on_main_thread()
222    }
223
224    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
225        if !self.supports_threads {
226            self.dispatch_on_main_thread(runnable, priority);
227            return;
228        }
229
230        let result = if self.on_main_thread() {
231            self.background_sender.spin_send(priority, runnable)
232        } else {
233            self.background_sender.send(priority, runnable)
234        };
235
236        if let Err(error) = result {
237            log::error!("dispatch: failed to send to background queue: {error:?}");
238        }
239    }
240
241    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
242        if self.on_main_thread() {
243            schedule_runnable(&self.browser_window, runnable, priority);
244        } else {
245            self.main_thread_mailbox
246                .post(priority, MainThreadItem::Runnable(runnable));
247        }
248    }
249
250    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
251        let millis = duration.as_millis().min(i32::MAX as u128) as i32;
252        if self.on_main_thread() {
253            let callback = Closure::once_into_js(move || {
254                if !runnable.metadata().is_closed() {
255                    runnable.run();
256                }
257            });
258            self.browser_window
259                .set_timeout_with_callback_and_timeout_and_arguments_0(
260                    callback.unchecked_ref(),
261                    millis,
262                )
263                .ok();
264        } else {
265            self.main_thread_mailbox
266                .post(Priority::High, MainThreadItem::Delayed { runnable, millis });
267        }
268    }
269
270    fn spawn_realtime(&self, function: Box<dyn FnOnce() + Send>) {
271        if self.on_main_thread() {
272            let callback = Closure::once_into_js(move || {
273                function();
274            });
275            self.browser_window
276                .queue_microtask(callback.unchecked_ref());
277        } else {
278            self.main_thread_mailbox
279                .post(Priority::High, MainThreadItem::RealtimeFunction(function));
280        }
281    }
282
283    fn now(&self) -> Instant {
284        Instant::now()
285    }
286}
287
288fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
289    match item {
290        MainThreadItem::Runnable(runnable) => {
291            if !runnable.metadata().is_closed() {
292                runnable.run();
293            }
294        }
295        MainThreadItem::Delayed { runnable, millis } => {
296            let callback = Closure::once_into_js(move || {
297                if !runnable.metadata().is_closed() {
298                    runnable.run();
299                }
300            });
301            window
302                .set_timeout_with_callback_and_timeout_and_arguments_0(
303                    callback.unchecked_ref(),
304                    millis,
305                )
306                .ok();
307        }
308        MainThreadItem::RealtimeFunction(function) => {
309            function();
310        }
311    }
312}
313
314fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
315    let callback = Closure::once_into_js(move || {
316        if !runnable.metadata().is_closed() {
317            runnable.run();
318        }
319    });
320    let callback: &js_sys::Function = callback.unchecked_ref();
321
322    match priority {
323        Priority::RealtimeAudio => {
324            window.queue_microtask(callback);
325        }
326        _ => {
327            // TODO-Wasm: this ought to enqueue so we can dequeue with proper priority
328            window
329                .set_timeout_with_callback_and_timeout_and_arguments_0(callback, 0)
330                .ok();
331        }
332    }
333}