dispatcher.rs

  1use gpui::{
  2    PlatformDispatcher, Priority, PriorityQueueReceiver, PriorityQueueSender, RunnableVariant,
  3    ThreadTaskTimings,
  4};
  5use std::sync::Arc;
  6use std::sync::atomic::AtomicI32;
  7use std::time::Duration;
  8use wasm_bindgen::prelude::*;
  9use web_time::Instant;
 10
 11#[cfg(feature = "multithreaded")]
 12const MIN_BACKGROUND_THREADS: usize = 2;
 13
 14#[cfg(feature = "multithreaded")]
 15fn shared_memory_supported() -> bool {
 16    let global = js_sys::global();
 17    let has_shared_array_buffer =
 18        js_sys::Reflect::has(&global, &JsValue::from_str("SharedArrayBuffer")).unwrap_or(false);
 19    let has_atomics = js_sys::Reflect::has(&global, &JsValue::from_str("Atomics")).unwrap_or(false);
 20    let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
 21    let buffer = memory.buffer();
 22    let is_shared_buffer = buffer.is_instance_of::<js_sys::SharedArrayBuffer>();
 23    has_shared_array_buffer && has_atomics && is_shared_buffer
 24}
 25
 26enum MainThreadItem {
 27    Runnable(RunnableVariant),
 28    Delayed {
 29        runnable: RunnableVariant,
 30        millis: i32,
 31    },
 32    // TODO-Wasm: Shouldn't these run on their own dedicated thread?
 33    RealtimeFunction(Box<dyn FnOnce() + Send>),
 34}
 35
 36struct MainThreadMailbox {
 37    sender: PriorityQueueSender<MainThreadItem>,
 38    receiver: parking_lot::Mutex<PriorityQueueReceiver<MainThreadItem>>,
 39    signal: AtomicI32,
 40}
 41
 42impl MainThreadMailbox {
 43    fn new() -> Self {
 44        let (sender, receiver) = PriorityQueueReceiver::new();
 45        Self {
 46            sender,
 47            receiver: parking_lot::Mutex::new(receiver),
 48            signal: AtomicI32::new(0),
 49        }
 50    }
 51
 52    fn post(&self, priority: Priority, item: MainThreadItem) {
 53        if self.sender.spin_send(priority, item).is_err() {
 54            log::error!("MainThreadMailbox::send failed: receiver disconnected");
 55        }
 56
 57        // TODO-Wasm: Verify this lock-free protocol
 58        let view = self.signal_view();
 59        js_sys::Atomics::store(&view, 0, 1).ok();
 60        js_sys::Atomics::notify(&view, 0).ok();
 61    }
 62
 63    fn drain(&self, window: &web_sys::Window) {
 64        let mut receiver = self.receiver.lock();
 65        loop {
 66            // We need these `spin` variants because we can't acquire a lock on the main thread.
 67            // TODO-WASM: Should we do something different?
 68            match receiver.spin_try_pop() {
 69                Ok(Some(item)) => execute_on_main_thread(window, item),
 70                Ok(None) => break,
 71                Err(_) => break,
 72            }
 73        }
 74    }
 75
 76    fn signal_view(&self) -> js_sys::Int32Array {
 77        let byte_offset = self.signal.as_ptr() as u32;
 78        let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
 79        js_sys::Int32Array::new_with_byte_offset_and_length(&memory.buffer(), byte_offset, 1)
 80    }
 81
 82    fn run_waker_loop(self: &Arc<Self>, window: web_sys::Window) {
 83        if !shared_memory_supported() {
 84            log::warn!("SharedArrayBuffer not available; main thread mailbox waker loop disabled");
 85            return;
 86        }
 87
 88        let mailbox = Arc::clone(self);
 89        wasm_bindgen_futures::spawn_local(async move {
 90            let view = mailbox.signal_view();
 91            loop {
 92                js_sys::Atomics::store(&view, 0, 0).expect("Atomics.store failed");
 93
 94                let result = match js_sys::Atomics::wait_async(&view, 0, 0) {
 95                    Ok(result) => result,
 96                    Err(error) => {
 97                        log::error!("Atomics.waitAsync failed: {error:?}");
 98                        break;
 99                    }
100                };
101
102                let is_async = js_sys::Reflect::get(&result, &JsValue::from_str("async"))
103                    .ok()
104                    .and_then(|v| v.as_bool())
105                    .unwrap_or(false);
106
107                if !is_async {
108                    log::error!("Atomics.waitAsync returned synchronously; waker loop exiting");
109                    break;
110                }
111
112                let promise: js_sys::Promise =
113                    js_sys::Reflect::get(&result, &JsValue::from_str("value"))
114                        .expect("waitAsync result missing 'value'")
115                        .unchecked_into();
116
117                let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
118
119                mailbox.drain(&window);
120            }
121        });
122    }
123}
124
125pub struct WebDispatcher {
126    main_thread_id: std::thread::ThreadId,
127    browser_window: web_sys::Window,
128    background_sender: PriorityQueueSender<RunnableVariant>,
129    main_thread_mailbox: Arc<MainThreadMailbox>,
130    supports_threads: bool,
131    #[cfg(feature = "multithreaded")]
132    _background_threads: Vec<wasm_thread::JoinHandle<()>>,
133}
134
135// Safety: `web_sys::Window` is only accessed from the main thread
136// All other fields are `Send + Sync` by construction.
137unsafe impl Send for WebDispatcher {}
138unsafe impl Sync for WebDispatcher {}
139
140impl WebDispatcher {
141    pub fn new(browser_window: web_sys::Window, allow_threads: bool) -> Self {
142        #[cfg(feature = "multithreaded")]
143        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
144        #[cfg(not(feature = "multithreaded"))]
145        let (background_sender, _) = PriorityQueueReceiver::new();
146
147        let main_thread_mailbox = Arc::new(MainThreadMailbox::new());
148
149        #[cfg(feature = "multithreaded")]
150        let supports_threads = allow_threads && shared_memory_supported();
151        #[cfg(not(feature = "multithreaded"))]
152        let supports_threads = false;
153
154        if supports_threads {
155            main_thread_mailbox.run_waker_loop(browser_window.clone());
156        } else {
157            log::warn!(
158                "SharedArrayBuffer not available; falling back to single-threaded dispatcher"
159            );
160        }
161
162        #[cfg(feature = "multithreaded")]
163        let background_threads = if supports_threads {
164            let thread_count = browser_window
165                .navigator()
166                .hardware_concurrency()
167                .max(MIN_BACKGROUND_THREADS as f64) as usize;
168
169            // TODO-Wasm: Is it bad to have web workers blocking for a long time like this?
170            (0..thread_count)
171                .map(|i| {
172                    let mut receiver = background_receiver.clone();
173                    wasm_thread::Builder::new()
174                        .name(format!("background-worker-{i}"))
175                        .spawn(move || {
176                            loop {
177                                let runnable: RunnableVariant = match receiver.pop() {
178                                    Ok(runnable) => runnable,
179                                    Err(_) => {
180                                        log::info!(
181                                            "background-worker-{i}: channel disconnected, exiting"
182                                        );
183                                        break;
184                                    }
185                                };
186
187                                if runnable.metadata().is_closed() {
188                                    continue;
189                                }
190
191                                runnable.run();
192                            }
193                        })
194                        .expect("failed to spawn background worker thread")
195                })
196                .collect::<Vec<_>>()
197        } else {
198            Vec::new()
199        };
200
201        Self {
202            main_thread_id: std::thread::current().id(),
203            browser_window,
204            background_sender,
205            main_thread_mailbox,
206            supports_threads,
207            #[cfg(feature = "multithreaded")]
208            _background_threads: background_threads,
209        }
210    }
211
212    fn on_main_thread(&self) -> bool {
213        std::thread::current().id() == self.main_thread_id
214    }
215}
216
217impl PlatformDispatcher for WebDispatcher {
218    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
219        // TODO-Wasm: should we panic here?
220        Vec::new()
221    }
222
223    fn get_current_thread_timings(&self) -> ThreadTaskTimings {
224        ThreadTaskTimings {
225            thread_name: None,
226            thread_id: std::thread::current().id(),
227            timings: Vec::new(),
228            total_pushed: 0,
229        }
230    }
231
232    fn is_main_thread(&self) -> bool {
233        self.on_main_thread()
234    }
235
236    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
237        if !self.supports_threads {
238            self.dispatch_on_main_thread(runnable, priority);
239            return;
240        }
241
242        let result = if self.on_main_thread() {
243            self.background_sender.spin_send(priority, runnable)
244        } else {
245            self.background_sender.send(priority, runnable)
246        };
247
248        if let Err(error) = result {
249            log::error!("dispatch: failed to send to background queue: {error:?}");
250        }
251    }
252
253    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
254        if self.on_main_thread() {
255            schedule_runnable(&self.browser_window, runnable, priority);
256        } else {
257            self.main_thread_mailbox
258                .post(priority, MainThreadItem::Runnable(runnable));
259        }
260    }
261
262    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
263        let millis = duration.as_millis().min(i32::MAX as u128) as i32;
264        if self.on_main_thread() {
265            let callback = Closure::once_into_js(move || {
266                if !runnable.metadata().is_closed() {
267                    runnable.run();
268                }
269            });
270            self.browser_window
271                .set_timeout_with_callback_and_timeout_and_arguments_0(
272                    callback.unchecked_ref(),
273                    millis,
274                )
275                .ok();
276        } else {
277            self.main_thread_mailbox
278                .post(Priority::High, MainThreadItem::Delayed { runnable, millis });
279        }
280    }
281
282    fn spawn_realtime(&self, function: Box<dyn FnOnce() + Send>) {
283        if self.on_main_thread() {
284            let callback = Closure::once_into_js(move || {
285                function();
286            });
287            self.browser_window
288                .queue_microtask(callback.unchecked_ref());
289        } else {
290            self.main_thread_mailbox
291                .post(Priority::High, MainThreadItem::RealtimeFunction(function));
292        }
293    }
294
295    fn now(&self) -> Instant {
296        Instant::now()
297    }
298}
299
300fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
301    match item {
302        MainThreadItem::Runnable(runnable) => {
303            if !runnable.metadata().is_closed() {
304                runnable.run();
305            }
306        }
307        MainThreadItem::Delayed { runnable, millis } => {
308            let callback = Closure::once_into_js(move || {
309                if !runnable.metadata().is_closed() {
310                    runnable.run();
311                }
312            });
313            window
314                .set_timeout_with_callback_and_timeout_and_arguments_0(
315                    callback.unchecked_ref(),
316                    millis,
317                )
318                .ok();
319        }
320        MainThreadItem::RealtimeFunction(function) => {
321            function();
322        }
323    }
324}
325
326fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
327    let callback = Closure::once_into_js(move || {
328        if !runnable.metadata().is_closed() {
329            runnable.run();
330        }
331    });
332    let callback: &js_sys::Function = callback.unchecked_ref();
333
334    match priority {
335        Priority::RealtimeAudio => {
336            window.queue_microtask(callback);
337        }
338        _ => {
339            // TODO-Wasm: this ought to enqueue so we can dequeue with proper priority
340            window
341                .set_timeout_with_callback_and_timeout_and_arguments_0(callback, 0)
342                .ok();
343        }
344    }
345}