1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 ops::RangeInclusive,
13 pin::Pin,
14 rc::Rc,
15 sync::{
16 atomic::{AtomicBool, Ordering::SeqCst},
17 Arc,
18 },
19 task::{Context, Poll},
20 thread,
21 time::Duration,
22};
23use waker_fn::waker_fn;
24
25use crate::{platform, util};
26
27pub enum Foreground {
28 Platform {
29 dispatcher: Arc<dyn platform::Dispatcher>,
30 _not_send_or_sync: PhantomData<Rc<()>>,
31 },
32 Test(smol::LocalExecutor<'static>),
33 Deterministic(Arc<Deterministic>),
34}
35
36pub enum Background {
37 Deterministic(Arc<Deterministic>),
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44struct DeterministicState {
45 rng: StdRng,
46 seed: u64,
47 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
48 scheduled_from_background: Vec<(Runnable, Backtrace)>,
49 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
50 forbid_parking: bool,
51 block_on_ticks: RangeInclusive<usize>,
52}
53
54pub struct Deterministic {
55 state: Arc<Mutex<DeterministicState>>,
56 parker: Mutex<parking::Parker>,
57}
58
59impl Deterministic {
60 fn new(seed: u64) -> Self {
61 Self {
62 state: Arc::new(Mutex::new(DeterministicState {
63 rng: StdRng::seed_from_u64(seed),
64 seed,
65 scheduled_from_foreground: Default::default(),
66 scheduled_from_background: Default::default(),
67 spawned_from_foreground: Default::default(),
68 forbid_parking: false,
69 block_on_ticks: 0..=1000,
70 })),
71 parker: Default::default(),
72 }
73 }
74
75 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
76 where
77 T: 'static,
78 F: Future<Output = T> + 'static,
79 {
80 let backtrace = Backtrace::new_unresolved();
81 let scheduled_once = AtomicBool::new(false);
82 let state = self.state.clone();
83 let unparker = self.parker.lock().unparker();
84 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
85 let mut state = state.lock();
86 let backtrace = backtrace.clone();
87 if scheduled_once.fetch_or(true, SeqCst) {
88 state.scheduled_from_foreground.push((runnable, backtrace));
89 } else {
90 state.spawned_from_foreground.push((runnable, backtrace));
91 }
92 unparker.unpark();
93 });
94 runnable.schedule();
95 task
96 }
97
98 pub fn spawn<F, T>(&self, future: F) -> Task<T>
99 where
100 T: 'static + Send,
101 F: 'static + Send + Future<Output = T>,
102 {
103 let backtrace = Backtrace::new_unresolved();
104 let state = self.state.clone();
105 let unparker = self.parker.lock().unparker();
106 let (runnable, task) = async_task::spawn(future, move |runnable| {
107 let mut state = state.lock();
108 state
109 .scheduled_from_background
110 .push((runnable, backtrace.clone()));
111 unparker.unpark();
112 });
113 runnable.schedule();
114 task
115 }
116
117 pub fn run<F, T>(&self, future: F) -> T
118 where
119 T: 'static,
120 F: Future<Output = T> + 'static,
121 {
122 smol::pin!(future);
123
124 let unparker = self.parker.lock().unparker();
125 let woken = Arc::new(AtomicBool::new(false));
126 let waker = {
127 let woken = woken.clone();
128 waker_fn(move || {
129 woken.store(true, SeqCst);
130 unparker.unpark();
131 })
132 };
133
134 let mut cx = Context::from_waker(&waker);
135 let mut trace = Trace::default();
136 loop {
137 let mut state = self.state.lock();
138 let runnable_count = state.scheduled_from_foreground.len()
139 + state.scheduled_from_background.len()
140 + state.spawned_from_foreground.len();
141
142 let ix = state.rng.gen_range(0..=runnable_count);
143 if ix < state.scheduled_from_foreground.len() {
144 let (_, backtrace) = &state.scheduled_from_foreground[ix];
145 trace.record(&state, backtrace.clone());
146 let runnable = state.scheduled_from_foreground.remove(ix).0;
147 drop(state);
148 runnable.run();
149 } else if ix - state.scheduled_from_foreground.len()
150 < state.scheduled_from_background.len()
151 {
152 let ix = ix - state.scheduled_from_foreground.len();
153 let (_, backtrace) = &state.scheduled_from_background[ix];
154 trace.record(&state, backtrace.clone());
155 let runnable = state.scheduled_from_background.remove(ix).0;
156 drop(state);
157 runnable.run();
158 } else if ix < runnable_count {
159 let (_, backtrace) = &state.spawned_from_foreground[0];
160 trace.record(&state, backtrace.clone());
161 let runnable = state.spawned_from_foreground.remove(0).0;
162 drop(state);
163 runnable.run();
164 } else {
165 drop(state);
166 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
167 return result;
168 }
169 let state = self.state.lock();
170 if state.scheduled_from_foreground.is_empty()
171 && state.scheduled_from_background.is_empty()
172 && state.spawned_from_foreground.is_empty()
173 {
174 if state.forbid_parking && !woken.load(SeqCst) {
175 panic!("deterministic executor parked after a call to forbid_parking");
176 }
177 drop(state);
178 woken.store(false, SeqCst);
179 self.parker.lock().park();
180 }
181
182 continue;
183 }
184 }
185 }
186
187 pub fn block_on<F, T>(&self, future: F) -> Option<T>
188 where
189 T: 'static,
190 F: Future<Output = T>,
191 {
192 smol::pin!(future);
193
194 let unparker = self.parker.lock().unparker();
195 let waker = waker_fn(move || {
196 unparker.unpark();
197 });
198 let max_ticks = {
199 let mut state = self.state.lock();
200 let range = state.block_on_ticks.clone();
201 state.rng.gen_range(range)
202 };
203
204 let mut cx = Context::from_waker(&waker);
205 let mut trace = Trace::default();
206 for _ in 0..max_ticks {
207 let mut state = self.state.lock();
208 let runnable_count = state.scheduled_from_background.len();
209 let ix = state.rng.gen_range(0..=runnable_count);
210 if ix < state.scheduled_from_background.len() {
211 let (_, backtrace) = &state.scheduled_from_background[ix];
212 trace.record(&state, backtrace.clone());
213 let runnable = state.scheduled_from_background.remove(ix).0;
214 drop(state);
215 runnable.run();
216 } else {
217 drop(state);
218 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
219 return Some(result);
220 }
221 let state = self.state.lock();
222 if state.scheduled_from_background.is_empty() {
223 if state.forbid_parking {
224 panic!("deterministic executor parked after a call to forbid_parking");
225 }
226 drop(state);
227 self.parker.lock().park();
228 }
229
230 continue;
231 }
232 }
233
234 None
235 }
236}
237
238#[derive(Default)]
239struct Trace {
240 executed: Vec<Backtrace>,
241 scheduled: Vec<Vec<Backtrace>>,
242 spawned_from_foreground: Vec<Vec<Backtrace>>,
243}
244
245impl Trace {
246 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
247 self.scheduled.push(
248 state
249 .scheduled_from_foreground
250 .iter()
251 .map(|(_, backtrace)| backtrace.clone())
252 .collect(),
253 );
254 self.spawned_from_foreground.push(
255 state
256 .spawned_from_foreground
257 .iter()
258 .map(|(_, backtrace)| backtrace.clone())
259 .collect(),
260 );
261 self.executed.push(executed);
262 }
263
264 fn resolve(&mut self) {
265 for backtrace in &mut self.executed {
266 backtrace.resolve();
267 }
268
269 for backtraces in &mut self.scheduled {
270 for backtrace in backtraces {
271 backtrace.resolve();
272 }
273 }
274
275 for backtraces in &mut self.spawned_from_foreground {
276 for backtrace in backtraces {
277 backtrace.resolve();
278 }
279 }
280 }
281}
282
283impl Debug for Trace {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
286
287 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
289 let cwd = std::env::current_dir().unwrap();
290 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
291 fmt::Display::fmt(&path, fmt)
292 };
293 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
294 for frame in self.0.frames() {
295 let mut formatted_frame = fmt.frame();
296 if frame
297 .symbols()
298 .iter()
299 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
300 {
301 formatted_frame.backtrace_frame(frame)?;
302 break;
303 }
304 }
305 fmt.finish()
306 }
307 }
308
309 for ((backtrace, scheduled), spawned_from_foreground) in self
310 .executed
311 .iter()
312 .zip(&self.scheduled)
313 .zip(&self.spawned_from_foreground)
314 {
315 writeln!(f, "Scheduled")?;
316 for backtrace in scheduled {
317 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
318 }
319 if scheduled.is_empty() {
320 writeln!(f, "None")?;
321 }
322 writeln!(f, "==========")?;
323
324 writeln!(f, "Spawned from foreground")?;
325 for backtrace in spawned_from_foreground {
326 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
327 }
328 if spawned_from_foreground.is_empty() {
329 writeln!(f, "None")?;
330 }
331 writeln!(f, "==========")?;
332
333 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
334 writeln!(f, "+++++++++++++++++++")?;
335 }
336
337 Ok(())
338 }
339}
340
341impl Drop for Trace {
342 fn drop(&mut self) {
343 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
344 trace_on_panic == "1" || trace_on_panic == "true"
345 } else {
346 false
347 };
348 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
349 trace_always == "1" || trace_always == "true"
350 } else {
351 false
352 };
353
354 if trace_always || (trace_on_panic && thread::panicking()) {
355 self.resolve();
356 dbg!(self);
357 }
358 }
359}
360
361impl Foreground {
362 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
363 if dispatcher.is_main_thread() {
364 Ok(Self::Platform {
365 dispatcher,
366 _not_send_or_sync: PhantomData,
367 })
368 } else {
369 Err(anyhow!("must be constructed on main thread"))
370 }
371 }
372
373 pub fn test() -> Self {
374 Self::Test(smol::LocalExecutor::new())
375 }
376
377 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
378 match self {
379 Self::Platform { dispatcher, .. } => {
380 let dispatcher = dispatcher.clone();
381 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
382 let (runnable, task) = async_task::spawn_local(future, schedule);
383 runnable.schedule();
384 task
385 }
386 Self::Test(executor) => executor.spawn(future),
387 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
388 }
389 }
390
391 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
392 match self {
393 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
394 Self::Test(executor) => smol::block_on(executor.run(future)),
395 Self::Deterministic(executor) => executor.run(future),
396 }
397 }
398
399 pub fn forbid_parking(&self) {
400 match self {
401 Self::Deterministic(executor) => {
402 let mut state = executor.state.lock();
403 state.forbid_parking = true;
404 state.rng = StdRng::seed_from_u64(state.seed);
405 }
406 _ => panic!("this method can only be called on a deterministic executor"),
407 }
408 }
409
410 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
411 match self {
412 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
413 _ => panic!("this method can only be called on a deterministic executor"),
414 }
415 }
416}
417
418impl Background {
419 pub fn new() -> Self {
420 let executor = Arc::new(Executor::new());
421 let stop = channel::unbounded::<()>();
422
423 for i in 0..2 * num_cpus::get() {
424 let executor = executor.clone();
425 let stop = stop.1.clone();
426 thread::Builder::new()
427 .name(format!("background-executor-{}", i))
428 .spawn(move || smol::block_on(executor.run(stop.recv())))
429 .unwrap();
430 }
431
432 Self::Production {
433 executor,
434 _stop: stop.0,
435 }
436 }
437
438 pub fn num_cpus(&self) -> usize {
439 num_cpus::get()
440 }
441
442 pub fn spawn<T, F>(&self, future: F) -> Task<T>
443 where
444 T: 'static + Send,
445 F: Send + Future<Output = T> + 'static,
446 {
447 match self {
448 Self::Production { executor, .. } => executor.spawn(future),
449 Self::Deterministic(executor) => executor.spawn(future),
450 }
451 }
452
453 pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
454 where
455 T: 'static,
456 F: 'static + Unpin + Future<Output = T>,
457 {
458 if !timeout.is_zero() {
459 let output = match self {
460 Self::Production { .. } => {
461 smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
462 }
463 Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
464 };
465 if let Some(output) = output {
466 return Ok(output);
467 }
468 }
469 Err(future)
470 }
471
472 pub async fn scoped<'scope, F>(&self, scheduler: F)
473 where
474 F: FnOnce(&mut Scope<'scope>),
475 {
476 let mut scope = Scope {
477 futures: Default::default(),
478 _phantom: PhantomData,
479 };
480 (scheduler)(&mut scope);
481 let spawned = scope
482 .futures
483 .into_iter()
484 .map(|f| self.spawn(f))
485 .collect::<Vec<_>>();
486 for task in spawned {
487 task.await;
488 }
489 }
490}
491
492pub struct Scope<'a> {
493 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
494 _phantom: PhantomData<&'a ()>,
495}
496
497impl<'a> Scope<'a> {
498 pub fn spawn<F>(&mut self, f: F)
499 where
500 F: Future<Output = ()> + Send + 'a,
501 {
502 let f = unsafe {
503 mem::transmute::<
504 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
505 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
506 >(Box::pin(f))
507 };
508 self.futures.push(f);
509 }
510}
511
512pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
513 let executor = Arc::new(Deterministic::new(seed));
514 (
515 Rc::new(Foreground::Deterministic(executor.clone())),
516 Arc::new(Background::Deterministic(executor)),
517 )
518}