1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, prelude::*, Executor, Timer};
9use std::{
10 fmt::{self, Debug},
11 marker::PhantomData,
12 mem,
13 ops::RangeInclusive,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc,
19 },
20 task::{Context, Poll},
21 thread,
22 time::{Duration, Instant},
23};
24use waker_fn::waker_fn;
25
26use crate::{platform, util};
27
28pub enum Foreground {
29 Platform {
30 dispatcher: Arc<dyn platform::Dispatcher>,
31 _not_send_or_sync: PhantomData<Rc<()>>,
32 },
33 Test(smol::LocalExecutor<'static>),
34 Deterministic(Arc<Deterministic>),
35}
36
37pub enum Background {
38 Deterministic(Arc<Deterministic>),
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45struct DeterministicState {
46 rng: StdRng,
47 seed: u64,
48 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
49 scheduled_from_background: Vec<(Runnable, Backtrace)>,
50 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
51 forbid_parking: bool,
52 block_on_ticks: RangeInclusive<usize>,
53 now: Instant,
54 pending_sleeps: Vec<(Instant, barrier::Sender)>,
55}
56
57pub struct Deterministic {
58 state: Arc<Mutex<DeterministicState>>,
59 parker: Mutex<parking::Parker>,
60}
61
62impl Deterministic {
63 fn new(seed: u64) -> Self {
64 Self {
65 state: Arc::new(Mutex::new(DeterministicState {
66 rng: StdRng::seed_from_u64(seed),
67 seed,
68 scheduled_from_foreground: Default::default(),
69 scheduled_from_background: Default::default(),
70 spawned_from_foreground: Default::default(),
71 forbid_parking: false,
72 block_on_ticks: 0..=1000,
73 now: Instant::now(),
74 pending_sleeps: Default::default(),
75 })),
76 parker: Default::default(),
77 }
78 }
79
80 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
81 where
82 T: 'static,
83 F: Future<Output = T> + 'static,
84 {
85 let backtrace = Backtrace::new_unresolved();
86 let scheduled_once = AtomicBool::new(false);
87 let state = self.state.clone();
88 let unparker = self.parker.lock().unparker();
89 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
90 let mut state = state.lock();
91 let backtrace = backtrace.clone();
92 if scheduled_once.fetch_or(true, SeqCst) {
93 state.scheduled_from_foreground.push((runnable, backtrace));
94 } else {
95 state.spawned_from_foreground.push((runnable, backtrace));
96 }
97 unparker.unpark();
98 });
99 runnable.schedule();
100 task
101 }
102
103 pub fn spawn<F, T>(&self, future: F) -> Task<T>
104 where
105 T: 'static + Send,
106 F: 'static + Send + Future<Output = T>,
107 {
108 let backtrace = Backtrace::new_unresolved();
109 let state = self.state.clone();
110 let unparker = self.parker.lock().unparker();
111 let (runnable, task) = async_task::spawn(future, move |runnable| {
112 let mut state = state.lock();
113 state
114 .scheduled_from_background
115 .push((runnable, backtrace.clone()));
116 unparker.unpark();
117 });
118 runnable.schedule();
119 task
120 }
121
122 pub fn run<F, T>(&self, future: F) -> T
123 where
124 T: 'static,
125 F: Future<Output = T> + 'static,
126 {
127 smol::pin!(future);
128
129 let unparker = self.parker.lock().unparker();
130 let woken = Arc::new(AtomicBool::new(false));
131 let waker = {
132 let woken = woken.clone();
133 waker_fn(move || {
134 woken.store(true, SeqCst);
135 unparker.unpark();
136 })
137 };
138
139 let mut cx = Context::from_waker(&waker);
140 let mut trace = Trace::default();
141 loop {
142 let mut state = self.state.lock();
143 let runnable_count = state.scheduled_from_foreground.len()
144 + state.scheduled_from_background.len()
145 + state.spawned_from_foreground.len();
146
147 let ix = state.rng.gen_range(0..=runnable_count);
148 if ix < state.scheduled_from_foreground.len() {
149 let (_, backtrace) = &state.scheduled_from_foreground[ix];
150 trace.record(&state, backtrace.clone());
151 let runnable = state.scheduled_from_foreground.remove(ix).0;
152 drop(state);
153 runnable.run();
154 } else if ix - state.scheduled_from_foreground.len()
155 < state.scheduled_from_background.len()
156 {
157 let ix = ix - state.scheduled_from_foreground.len();
158 let (_, backtrace) = &state.scheduled_from_background[ix];
159 trace.record(&state, backtrace.clone());
160 let runnable = state.scheduled_from_background.remove(ix).0;
161 drop(state);
162 runnable.run();
163 } else if ix < runnable_count {
164 let (_, backtrace) = &state.spawned_from_foreground[0];
165 trace.record(&state, backtrace.clone());
166 let runnable = state.spawned_from_foreground.remove(0).0;
167 drop(state);
168 runnable.run();
169 } else {
170 drop(state);
171 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
172 return result;
173 }
174 let state = self.state.lock();
175 if state.scheduled_from_foreground.is_empty()
176 && state.scheduled_from_background.is_empty()
177 && state.spawned_from_foreground.is_empty()
178 {
179 if state.forbid_parking && !woken.load(SeqCst) {
180 panic!("deterministic executor parked after a call to forbid_parking");
181 }
182 drop(state);
183 woken.store(false, SeqCst);
184 self.parker.lock().park();
185 }
186
187 continue;
188 }
189 }
190 }
191
192 pub fn block_on<F, T>(&self, future: F) -> Option<T>
193 where
194 T: 'static,
195 F: Future<Output = T>,
196 {
197 smol::pin!(future);
198
199 let unparker = self.parker.lock().unparker();
200 let waker = waker_fn(move || {
201 unparker.unpark();
202 });
203 let max_ticks = {
204 let mut state = self.state.lock();
205 let range = state.block_on_ticks.clone();
206 state.rng.gen_range(range)
207 };
208
209 let mut cx = Context::from_waker(&waker);
210 let mut trace = Trace::default();
211 for _ in 0..max_ticks {
212 let mut state = self.state.lock();
213 let runnable_count = state.scheduled_from_background.len();
214 let ix = state.rng.gen_range(0..=runnable_count);
215 if ix < state.scheduled_from_background.len() {
216 let (_, backtrace) = &state.scheduled_from_background[ix];
217 trace.record(&state, backtrace.clone());
218 let runnable = state.scheduled_from_background.remove(ix).0;
219 drop(state);
220 runnable.run();
221 } else {
222 drop(state);
223 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
224 return Some(result);
225 }
226 let state = self.state.lock();
227 if state.scheduled_from_background.is_empty() {
228 if state.forbid_parking {
229 panic!("deterministic executor parked after a call to forbid_parking");
230 }
231 drop(state);
232 self.parker.lock().park();
233 }
234
235 continue;
236 }
237 }
238
239 None
240 }
241}
242
243#[derive(Default)]
244struct Trace {
245 executed: Vec<Backtrace>,
246 scheduled: Vec<Vec<Backtrace>>,
247 spawned_from_foreground: Vec<Vec<Backtrace>>,
248}
249
250impl Trace {
251 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
252 self.scheduled.push(
253 state
254 .scheduled_from_foreground
255 .iter()
256 .map(|(_, backtrace)| backtrace.clone())
257 .collect(),
258 );
259 self.spawned_from_foreground.push(
260 state
261 .spawned_from_foreground
262 .iter()
263 .map(|(_, backtrace)| backtrace.clone())
264 .collect(),
265 );
266 self.executed.push(executed);
267 }
268
269 fn resolve(&mut self) {
270 for backtrace in &mut self.executed {
271 backtrace.resolve();
272 }
273
274 for backtraces in &mut self.scheduled {
275 for backtrace in backtraces {
276 backtrace.resolve();
277 }
278 }
279
280 for backtraces in &mut self.spawned_from_foreground {
281 for backtrace in backtraces {
282 backtrace.resolve();
283 }
284 }
285 }
286}
287
288impl Debug for Trace {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
291
292 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
294 let cwd = std::env::current_dir().unwrap();
295 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
296 fmt::Display::fmt(&path, fmt)
297 };
298 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
299 for frame in self.0.frames() {
300 let mut formatted_frame = fmt.frame();
301 if frame
302 .symbols()
303 .iter()
304 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
305 {
306 formatted_frame.backtrace_frame(frame)?;
307 break;
308 }
309 }
310 fmt.finish()
311 }
312 }
313
314 for ((backtrace, scheduled), spawned_from_foreground) in self
315 .executed
316 .iter()
317 .zip(&self.scheduled)
318 .zip(&self.spawned_from_foreground)
319 {
320 writeln!(f, "Scheduled")?;
321 for backtrace in scheduled {
322 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
323 }
324 if scheduled.is_empty() {
325 writeln!(f, "None")?;
326 }
327 writeln!(f, "==========")?;
328
329 writeln!(f, "Spawned from foreground")?;
330 for backtrace in spawned_from_foreground {
331 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
332 }
333 if spawned_from_foreground.is_empty() {
334 writeln!(f, "None")?;
335 }
336 writeln!(f, "==========")?;
337
338 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
339 writeln!(f, "+++++++++++++++++++")?;
340 }
341
342 Ok(())
343 }
344}
345
346impl Drop for Trace {
347 fn drop(&mut self) {
348 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
349 trace_on_panic == "1" || trace_on_panic == "true"
350 } else {
351 false
352 };
353 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
354 trace_always == "1" || trace_always == "true"
355 } else {
356 false
357 };
358
359 if trace_always || (trace_on_panic && thread::panicking()) {
360 self.resolve();
361 dbg!(self);
362 }
363 }
364}
365
366impl Foreground {
367 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
368 if dispatcher.is_main_thread() {
369 Ok(Self::Platform {
370 dispatcher,
371 _not_send_or_sync: PhantomData,
372 })
373 } else {
374 Err(anyhow!("must be constructed on main thread"))
375 }
376 }
377
378 pub fn test() -> Self {
379 Self::Test(smol::LocalExecutor::new())
380 }
381
382 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
383 match self {
384 Self::Platform { dispatcher, .. } => {
385 let dispatcher = dispatcher.clone();
386 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
387 let (runnable, task) = async_task::spawn_local(future, schedule);
388 runnable.schedule();
389 task
390 }
391 Self::Test(executor) => executor.spawn(future),
392 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
393 }
394 }
395
396 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
397 match self {
398 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
399 Self::Test(executor) => smol::block_on(executor.run(future)),
400 Self::Deterministic(executor) => executor.run(future),
401 }
402 }
403
404 pub fn forbid_parking(&self) {
405 match self {
406 Self::Deterministic(executor) => {
407 let mut state = executor.state.lock();
408 state.forbid_parking = true;
409 state.rng = StdRng::seed_from_u64(state.seed);
410 }
411 _ => panic!("this method can only be called on a deterministic executor"),
412 }
413 }
414
415 pub async fn sleep(&self, duration: Duration) {
416 match self {
417 Self::Deterministic(executor) => {
418 let (tx, mut rx) = barrier::channel();
419 {
420 let mut state = executor.state.lock();
421 let wakeup_at = state.now + duration;
422 state.pending_sleeps.push((wakeup_at, tx));
423 }
424 rx.recv().await;
425 }
426 _ => {
427 Timer::after(duration).await;
428 }
429 }
430 }
431
432 pub fn advance_clock(&self, duration: Duration) {
433 match self {
434 Self::Deterministic(executor) => {
435 let mut state = executor.state.lock();
436 state.now += duration;
437 let now = state.now;
438 state.pending_sleeps.retain(|(wakeup, _)| *wakeup > now);
439 }
440 _ => panic!("this method can only be called on a deterministic executor"),
441 }
442 }
443
444 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
445 match self {
446 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
447 _ => panic!("this method can only be called on a deterministic executor"),
448 }
449 }
450}
451
452impl Background {
453 pub fn new() -> Self {
454 let executor = Arc::new(Executor::new());
455 let stop = channel::unbounded::<()>();
456
457 for i in 0..2 * num_cpus::get() {
458 let executor = executor.clone();
459 let stop = stop.1.clone();
460 thread::Builder::new()
461 .name(format!("background-executor-{}", i))
462 .spawn(move || smol::block_on(executor.run(stop.recv())))
463 .unwrap();
464 }
465
466 Self::Production {
467 executor,
468 _stop: stop.0,
469 }
470 }
471
472 pub fn num_cpus(&self) -> usize {
473 num_cpus::get()
474 }
475
476 pub fn spawn<T, F>(&self, future: F) -> Task<T>
477 where
478 T: 'static + Send,
479 F: Send + Future<Output = T> + 'static,
480 {
481 match self {
482 Self::Production { executor, .. } => executor.spawn(future),
483 Self::Deterministic(executor) => executor.spawn(future),
484 }
485 }
486
487 pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
488 where
489 T: 'static,
490 F: 'static + Unpin + Future<Output = T>,
491 {
492 if !timeout.is_zero() {
493 let output = match self {
494 Self::Production { .. } => {
495 smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
496 }
497 Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
498 };
499 if let Some(output) = output {
500 return Ok(output);
501 }
502 }
503 Err(future)
504 }
505
506 pub async fn scoped<'scope, F>(&self, scheduler: F)
507 where
508 F: FnOnce(&mut Scope<'scope>),
509 {
510 let mut scope = Scope {
511 futures: Default::default(),
512 _phantom: PhantomData,
513 };
514 (scheduler)(&mut scope);
515 let spawned = scope
516 .futures
517 .into_iter()
518 .map(|f| self.spawn(f))
519 .collect::<Vec<_>>();
520 for task in spawned {
521 task.await;
522 }
523 }
524}
525
526pub struct Scope<'a> {
527 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
528 _phantom: PhantomData<&'a ()>,
529}
530
531impl<'a> Scope<'a> {
532 pub fn spawn<F>(&mut self, f: F)
533 where
534 F: Future<Output = ()> + Send + 'a,
535 {
536 let f = unsafe {
537 mem::transmute::<
538 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
539 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
540 >(Box::pin(f))
541 };
542 self.futures.push(f);
543 }
544}
545
546pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
547 let executor = Arc::new(Deterministic::new(seed));
548 (
549 Rc::new(Foreground::Deterministic(executor.clone())),
550 Arc::new(Background::Deterministic(executor)),
551 )
552}