dispatcher.rs

  1use gpui::{
  2    PlatformDispatcher, Priority, PriorityQueueReceiver, PriorityQueueSender, RunnableVariant,
  3    ThreadTaskTimings,
  4};
  5use std::sync::Arc;
  6use std::sync::atomic::AtomicI32;
  7use std::time::Duration;
  8use wasm_bindgen::prelude::*;
  9use web_time::Instant;
 10
 11#[cfg(feature = "multithreaded")]
 12const MIN_BACKGROUND_THREADS: usize = 2;
 13
 14#[cfg(feature = "multithreaded")]
 15fn shared_memory_supported() -> bool {
 16    let global = js_sys::global();
 17    let has_shared_array_buffer =
 18        js_sys::Reflect::has(&global, &JsValue::from_str("SharedArrayBuffer")).unwrap_or(false);
 19    let has_atomics = js_sys::Reflect::has(&global, &JsValue::from_str("Atomics")).unwrap_or(false);
 20    let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
 21    let buffer = memory.buffer();
 22    let is_shared_buffer = buffer.is_instance_of::<js_sys::SharedArrayBuffer>();
 23    has_shared_array_buffer && has_atomics && is_shared_buffer
 24}
 25
 26enum MainThreadItem {
 27    Runnable(RunnableVariant),
 28    Delayed {
 29        runnable: RunnableVariant,
 30        millis: i32,
 31    },
 32    // TODO-Wasm: Shouldn't these run on their own dedicated thread?
 33    RealtimeFunction(Box<dyn FnOnce() + Send>),
 34}
 35
 36struct MainThreadMailbox {
 37    sender: PriorityQueueSender<MainThreadItem>,
 38    receiver: parking_lot::Mutex<PriorityQueueReceiver<MainThreadItem>>,
 39    signal: AtomicI32,
 40}
 41
 42impl MainThreadMailbox {
 43    fn new() -> Self {
 44        let (sender, receiver) = PriorityQueueReceiver::new();
 45        Self {
 46            sender,
 47            receiver: parking_lot::Mutex::new(receiver),
 48            signal: AtomicI32::new(0),
 49        }
 50    }
 51
 52    fn post(&self, priority: Priority, item: MainThreadItem) {
 53        if self.sender.spin_send(priority, item).is_err() {
 54            log::error!("MainThreadMailbox::send failed: receiver disconnected");
 55        }
 56
 57        // TODO-Wasm: Verify this lock-free protocol
 58        let view = self.signal_view();
 59        js_sys::Atomics::store(&view, 0, 1).ok();
 60        js_sys::Atomics::notify(&view, 0).ok();
 61    }
 62
 63    fn drain(&self, window: &web_sys::Window) {
 64        let mut receiver = self.receiver.lock();
 65        loop {
 66            // We need these `spin` variants because we can't acquire a lock on the main thread.
 67            // TODO-WASM: Should we do something different?
 68            match receiver.spin_try_pop() {
 69                Ok(Some(item)) => execute_on_main_thread(window, item),
 70                Ok(None) => break,
 71                Err(_) => break,
 72            }
 73        }
 74    }
 75
 76    fn signal_view(&self) -> js_sys::Int32Array {
 77        let byte_offset = self.signal.as_ptr() as u32;
 78        let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
 79        js_sys::Int32Array::new_with_byte_offset_and_length(&memory.buffer(), byte_offset, 1)
 80    }
 81
 82    fn run_waker_loop(self: &Arc<Self>, window: web_sys::Window) {
 83        if !shared_memory_supported() {
 84            log::warn!("SharedArrayBuffer not available; main thread mailbox waker loop disabled");
 85            return;
 86        }
 87
 88        let mailbox = Arc::clone(self);
 89        wasm_bindgen_futures::spawn_local(async move {
 90            let view = mailbox.signal_view();
 91            loop {
 92                js_sys::Atomics::store(&view, 0, 0).expect("Atomics.store failed");
 93
 94                let result = match js_sys::Atomics::wait_async(&view, 0, 0) {
 95                    Ok(result) => result,
 96                    Err(error) => {
 97                        log::error!("Atomics.waitAsync failed: {error:?}");
 98                        break;
 99                    }
100                };
101
102                let is_async = js_sys::Reflect::get(&result, &JsValue::from_str("async"))
103                    .ok()
104                    .and_then(|v| v.as_bool())
105                    .unwrap_or(false);
106
107                if !is_async {
108                    log::error!("Atomics.waitAsync returned synchronously; waker loop exiting");
109                    break;
110                }
111
112                let promise: js_sys::Promise =
113                    js_sys::Reflect::get(&result, &JsValue::from_str("value"))
114                        .expect("waitAsync result missing 'value'")
115                        .unchecked_into();
116
117                let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
118
119                mailbox.drain(&window);
120            }
121        });
122    }
123}
124
125pub struct WebDispatcher {
126    main_thread_id: std::thread::ThreadId,
127    browser_window: web_sys::Window,
128    background_sender: PriorityQueueSender<RunnableVariant>,
129    main_thread_mailbox: Arc<MainThreadMailbox>,
130    supports_threads: bool,
131    #[cfg(feature = "multithreaded")]
132    _background_threads: Vec<wasm_thread::JoinHandle<()>>,
133}
134
135// Safety: `web_sys::Window` is only accessed from the main thread
136// All other fields are `Send + Sync` by construction.
137unsafe impl Send for WebDispatcher {}
138unsafe impl Sync for WebDispatcher {}
139
140impl WebDispatcher {
141    pub fn new(browser_window: web_sys::Window, allow_threads: bool) -> Self {
142        #[cfg(feature = "multithreaded")]
143        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
144        #[cfg(not(feature = "multithreaded"))]
145        let (background_sender, _) = PriorityQueueReceiver::new();
146
147        let main_thread_mailbox = Arc::new(MainThreadMailbox::new());
148
149        #[cfg(feature = "multithreaded")]
150        let supports_threads = allow_threads && shared_memory_supported();
151        #[cfg(not(feature = "multithreaded"))]
152        let supports_threads = false;
153
154        if supports_threads {
155            main_thread_mailbox.run_waker_loop(browser_window.clone());
156        } else {
157            log::warn!(
158                "SharedArrayBuffer not available; falling back to single-threaded dispatcher"
159            );
160        }
161
162        #[cfg(feature = "multithreaded")]
163        let background_threads = if supports_threads {
164            let thread_count = browser_window
165                .navigator()
166                .hardware_concurrency()
167                .max(MIN_BACKGROUND_THREADS as f64) as usize;
168
169            // TODO-Wasm: Is it bad to have web workers blocking for a long time like this?
170            (0..thread_count)
171                .map(|i| {
172                    let mut receiver = background_receiver.clone();
173                    wasm_thread::Builder::new()
174                        .name(format!("background-worker-{i}"))
175                        .spawn(move || {
176                            loop {
177                                let runnable: RunnableVariant = match receiver.pop() {
178                                    Ok(runnable) => runnable,
179                                    Err(_) => {
180                                        log::info!(
181                                            "background-worker-{i}: channel disconnected, exiting"
182                                        );
183                                        break;
184                                    }
185                                };
186
187                                runnable.run();
188                            }
189                        })
190                        .expect("failed to spawn background worker thread")
191                })
192                .collect::<Vec<_>>()
193        } else {
194            Vec::new()
195        };
196
197        Self {
198            main_thread_id: std::thread::current().id(),
199            browser_window,
200            background_sender,
201            main_thread_mailbox,
202            supports_threads,
203            #[cfg(feature = "multithreaded")]
204            _background_threads: background_threads,
205        }
206    }
207
208    fn on_main_thread(&self) -> bool {
209        std::thread::current().id() == self.main_thread_id
210    }
211}
212
213impl PlatformDispatcher for WebDispatcher {
214    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
215        // TODO-Wasm: should we panic here?
216        Vec::new()
217    }
218
219    fn get_current_thread_timings(&self) -> ThreadTaskTimings {
220        ThreadTaskTimings {
221            thread_name: None,
222            thread_id: std::thread::current().id(),
223            timings: Vec::new(),
224            total_pushed: 0,
225        }
226    }
227
228    fn is_main_thread(&self) -> bool {
229        self.on_main_thread()
230    }
231
232    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
233        if !self.supports_threads {
234            self.dispatch_on_main_thread(runnable, priority);
235            return;
236        }
237
238        let result = if self.on_main_thread() {
239            self.background_sender.spin_send(priority, runnable)
240        } else {
241            self.background_sender.send(priority, runnable)
242        };
243
244        if let Err(error) = result {
245            log::error!("dispatch: failed to send to background queue: {error:?}");
246        }
247    }
248
249    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
250        if self.on_main_thread() {
251            schedule_runnable(&self.browser_window, runnable, priority);
252        } else {
253            self.main_thread_mailbox
254                .post(priority, MainThreadItem::Runnable(runnable));
255        }
256    }
257
258    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
259        let millis = duration.as_millis().min(i32::MAX as u128) as i32;
260        if self.on_main_thread() {
261            let callback = Closure::once_into_js(move || {
262                runnable.run();
263            });
264            self.browser_window
265                .set_timeout_with_callback_and_timeout_and_arguments_0(
266                    callback.unchecked_ref(),
267                    millis,
268                )
269                .ok();
270        } else {
271            self.main_thread_mailbox
272                .post(Priority::High, MainThreadItem::Delayed { runnable, millis });
273        }
274    }
275
276    fn spawn_realtime(&self, function: Box<dyn FnOnce() + Send>) {
277        if self.on_main_thread() {
278            let callback = Closure::once_into_js(move || {
279                function();
280            });
281            self.browser_window
282                .queue_microtask(callback.unchecked_ref());
283        } else {
284            self.main_thread_mailbox
285                .post(Priority::High, MainThreadItem::RealtimeFunction(function));
286        }
287    }
288
289    fn now(&self) -> Instant {
290        Instant::now()
291    }
292}
293
294fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
295    match item {
296        MainThreadItem::Runnable(runnable) => {
297            runnable.run();
298        }
299        MainThreadItem::Delayed { runnable, millis } => {
300            let callback = Closure::once_into_js(move || {
301                runnable.run();
302            });
303            window
304                .set_timeout_with_callback_and_timeout_and_arguments_0(
305                    callback.unchecked_ref(),
306                    millis,
307                )
308                .ok();
309        }
310        MainThreadItem::RealtimeFunction(function) => {
311            function();
312        }
313    }
314}
315
316fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
317    let callback = Closure::once_into_js(move || {
318        runnable.run();
319    });
320    let callback: &js_sys::Function = callback.unchecked_ref();
321
322    match priority {
323        Priority::RealtimeAudio => {
324            window.queue_microtask(callback);
325        }
326        _ => {
327            // TODO-Wasm: this ought to enqueue so we can dequeue with proper priority
328            window
329                .set_timeout_with_callback_and_timeout_and_arguments_0(callback, 0)
330                .ok();
331        }
332    }
333}