1use gpui::{
2 PlatformDispatcher, Priority, PriorityQueueReceiver, PriorityQueueSender, RunnableVariant,
3 ThreadTaskTimings,
4};
5use std::sync::Arc;
6use std::sync::atomic::AtomicI32;
7use std::time::Duration;
8use wasm_bindgen::prelude::*;
9use web_time::Instant;
10
11const MIN_BACKGROUND_THREADS: usize = 2;
12
13fn shared_memory_supported() -> bool {
14 let global = js_sys::global();
15 let has_shared_array_buffer =
16 js_sys::Reflect::has(&global, &JsValue::from_str("SharedArrayBuffer")).unwrap_or(false);
17 let has_atomics = js_sys::Reflect::has(&global, &JsValue::from_str("Atomics")).unwrap_or(false);
18 let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
19 let buffer = memory.buffer();
20 let is_shared_buffer = buffer.is_instance_of::<js_sys::SharedArrayBuffer>();
21 has_shared_array_buffer && has_atomics && is_shared_buffer
22}
23
24enum MainThreadItem {
25 Runnable(RunnableVariant),
26 Delayed {
27 runnable: RunnableVariant,
28 millis: i32,
29 },
30 // TODO-Wasm: Shouldn't these run on their own dedicated thread?
31 RealtimeFunction(Box<dyn FnOnce() + Send>),
32}
33
34struct MainThreadMailbox {
35 sender: PriorityQueueSender<MainThreadItem>,
36 receiver: parking_lot::Mutex<PriorityQueueReceiver<MainThreadItem>>,
37 signal: AtomicI32,
38}
39
40impl MainThreadMailbox {
41 fn new() -> Self {
42 let (sender, receiver) = PriorityQueueReceiver::new();
43 Self {
44 sender,
45 receiver: parking_lot::Mutex::new(receiver),
46 signal: AtomicI32::new(0),
47 }
48 }
49
50 fn post(&self, priority: Priority, item: MainThreadItem) {
51 if self.sender.spin_send(priority, item).is_err() {
52 log::error!("MainThreadMailbox::send failed: receiver disconnected");
53 }
54
55 // TODO-Wasm: Verify this lock-free protocol
56 let view = self.signal_view();
57 js_sys::Atomics::store(&view, 0, 1).ok();
58 js_sys::Atomics::notify(&view, 0).ok();
59 }
60
61 fn drain(&self, window: &web_sys::Window) {
62 let mut receiver = self.receiver.lock();
63 loop {
64 // We need these `spin` variants because we can't acquire a lock on the main thread.
65 // TODO-WASM: Should we do something different?
66 match receiver.spin_try_pop() {
67 Ok(Some(item)) => execute_on_main_thread(window, item),
68 Ok(None) => break,
69 Err(_) => break,
70 }
71 }
72 }
73
74 fn signal_view(&self) -> js_sys::Int32Array {
75 let byte_offset = self.signal.as_ptr() as u32;
76 let memory = js_sys::WebAssembly::Memory::from(wasm_bindgen::memory());
77 js_sys::Int32Array::new_with_byte_offset_and_length(&memory.buffer(), byte_offset, 1)
78 }
79
80 fn run_waker_loop(self: &Arc<Self>, window: web_sys::Window) {
81 if !shared_memory_supported() {
82 log::warn!("SharedArrayBuffer not available; main thread mailbox waker loop disabled");
83 return;
84 }
85
86 let mailbox = Arc::clone(self);
87 wasm_bindgen_futures::spawn_local(async move {
88 let view = mailbox.signal_view();
89 loop {
90 js_sys::Atomics::store(&view, 0, 0).expect("Atomics.store failed");
91
92 let result = match js_sys::Atomics::wait_async(&view, 0, 0) {
93 Ok(result) => result,
94 Err(error) => {
95 log::error!("Atomics.waitAsync failed: {error:?}");
96 break;
97 }
98 };
99
100 let is_async = js_sys::Reflect::get(&result, &JsValue::from_str("async"))
101 .ok()
102 .and_then(|v| v.as_bool())
103 .unwrap_or(false);
104
105 if !is_async {
106 log::error!("Atomics.waitAsync returned synchronously; waker loop exiting");
107 break;
108 }
109
110 let promise: js_sys::Promise =
111 js_sys::Reflect::get(&result, &JsValue::from_str("value"))
112 .expect("waitAsync result missing 'value'")
113 .unchecked_into();
114
115 let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
116
117 mailbox.drain(&window);
118 }
119 });
120 }
121}
122
123pub struct WebDispatcher {
124 main_thread_id: std::thread::ThreadId,
125 browser_window: web_sys::Window,
126 background_sender: PriorityQueueSender<RunnableVariant>,
127 main_thread_mailbox: Arc<MainThreadMailbox>,
128 supports_threads: bool,
129 _background_threads: Vec<wasm_thread::JoinHandle<()>>,
130}
131
132// Safety: `web_sys::Window` is only accessed from the main thread
133// All other fields are `Send + Sync` by construction.
134unsafe impl Send for WebDispatcher {}
135unsafe impl Sync for WebDispatcher {}
136
137impl WebDispatcher {
138 pub fn new(browser_window: web_sys::Window) -> Self {
139 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
140
141 let main_thread_mailbox = Arc::new(MainThreadMailbox::new());
142 let supports_threads = shared_memory_supported();
143
144 if supports_threads {
145 main_thread_mailbox.run_waker_loop(browser_window.clone());
146 } else {
147 log::warn!(
148 "SharedArrayBuffer not available; falling back to single-threaded dispatcher"
149 );
150 }
151
152 let background_threads = if supports_threads {
153 let thread_count = browser_window
154 .navigator()
155 .hardware_concurrency()
156 .max(MIN_BACKGROUND_THREADS as f64) as usize;
157
158 // TODO-Wasm: Is it bad to have web workers blocking for a long time like this?
159 (0..thread_count)
160 .map(|i| {
161 let mut receiver = background_receiver.clone();
162 wasm_thread::Builder::new()
163 .name(format!("background-worker-{i}"))
164 .spawn(move || {
165 loop {
166 let runnable: RunnableVariant = match receiver.pop() {
167 Ok(runnable) => runnable,
168 Err(_) => {
169 log::info!(
170 "background-worker-{i}: channel disconnected, exiting"
171 );
172 break;
173 }
174 };
175
176 if runnable.metadata().is_closed() {
177 continue;
178 }
179
180 runnable.run();
181 }
182 })
183 .expect("failed to spawn background worker thread")
184 })
185 .collect::<Vec<_>>()
186 } else {
187 Vec::new()
188 };
189
190 Self {
191 main_thread_id: std::thread::current().id(),
192 browser_window,
193 background_sender,
194 main_thread_mailbox,
195 supports_threads,
196 _background_threads: background_threads,
197 }
198 }
199
200 fn on_main_thread(&self) -> bool {
201 std::thread::current().id() == self.main_thread_id
202 }
203}
204
205impl PlatformDispatcher for WebDispatcher {
206 fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
207 // TODO-Wasm: should we panic here?
208 Vec::new()
209 }
210
211 fn get_current_thread_timings(&self) -> ThreadTaskTimings {
212 ThreadTaskTimings {
213 thread_name: None,
214 thread_id: std::thread::current().id(),
215 timings: Vec::new(),
216 total_pushed: 0,
217 }
218 }
219
220 fn is_main_thread(&self) -> bool {
221 self.on_main_thread()
222 }
223
224 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
225 if !self.supports_threads {
226 self.dispatch_on_main_thread(runnable, priority);
227 return;
228 }
229
230 let result = if self.on_main_thread() {
231 self.background_sender.spin_send(priority, runnable)
232 } else {
233 self.background_sender.send(priority, runnable)
234 };
235
236 if let Err(error) = result {
237 log::error!("dispatch: failed to send to background queue: {error:?}");
238 }
239 }
240
241 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
242 if self.on_main_thread() {
243 schedule_runnable(&self.browser_window, runnable, priority);
244 } else {
245 self.main_thread_mailbox
246 .post(priority, MainThreadItem::Runnable(runnable));
247 }
248 }
249
250 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
251 let millis = duration.as_millis().min(i32::MAX as u128) as i32;
252 if self.on_main_thread() {
253 let callback = Closure::once_into_js(move || {
254 if !runnable.metadata().is_closed() {
255 runnable.run();
256 }
257 });
258 self.browser_window
259 .set_timeout_with_callback_and_timeout_and_arguments_0(
260 callback.unchecked_ref(),
261 millis,
262 )
263 .ok();
264 } else {
265 self.main_thread_mailbox
266 .post(Priority::High, MainThreadItem::Delayed { runnable, millis });
267 }
268 }
269
270 fn spawn_realtime(&self, function: Box<dyn FnOnce() + Send>) {
271 if self.on_main_thread() {
272 let callback = Closure::once_into_js(move || {
273 function();
274 });
275 self.browser_window
276 .queue_microtask(callback.unchecked_ref());
277 } else {
278 self.main_thread_mailbox
279 .post(Priority::High, MainThreadItem::RealtimeFunction(function));
280 }
281 }
282
283 fn now(&self) -> Instant {
284 Instant::now()
285 }
286}
287
288fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
289 match item {
290 MainThreadItem::Runnable(runnable) => {
291 if !runnable.metadata().is_closed() {
292 runnable.run();
293 }
294 }
295 MainThreadItem::Delayed { runnable, millis } => {
296 let callback = Closure::once_into_js(move || {
297 if !runnable.metadata().is_closed() {
298 runnable.run();
299 }
300 });
301 window
302 .set_timeout_with_callback_and_timeout_and_arguments_0(
303 callback.unchecked_ref(),
304 millis,
305 )
306 .ok();
307 }
308 MainThreadItem::RealtimeFunction(function) => {
309 function();
310 }
311 }
312}
313
314fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
315 let callback = Closure::once_into_js(move || {
316 if !runnable.metadata().is_closed() {
317 runnable.run();
318 }
319 });
320 let callback: &js_sys::Function = callback.unchecked_ref();
321
322 match priority {
323 Priority::RealtimeAudio => {
324 window.queue_microtask(callback);
325 }
326 _ => {
327 // TODO-Wasm: this ought to enqueue so we can dequeue with proper priority
328 window
329 .set_timeout_with_callback_and_timeout_and_arguments_0(callback, 0)
330 .ok();
331 }
332 }
333}