1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, prelude::*, Executor, Timer};
9use std::{
10 fmt::{self, Debug},
11 marker::PhantomData,
12 mem,
13 ops::RangeInclusive,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc,
19 },
20 task::{Context, Poll},
21 thread,
22 time::{Duration, Instant},
23};
24use waker_fn::waker_fn;
25
26use crate::{platform, util};
27
28pub enum Foreground {
29 Platform {
30 dispatcher: Arc<dyn platform::Dispatcher>,
31 _not_send_or_sync: PhantomData<Rc<()>>,
32 },
33 Test(smol::LocalExecutor<'static>),
34 Deterministic(Arc<Deterministic>),
35}
36
37pub enum Background {
38 Deterministic(Arc<Deterministic>),
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45struct DeterministicState {
46 rng: StdRng,
47 seed: u64,
48 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
49 scheduled_from_background: Vec<(Runnable, Backtrace)>,
50 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
51 forbid_parking: bool,
52 block_on_ticks: RangeInclusive<usize>,
53 now: Instant,
54 pending_timers: Vec<(Instant, barrier::Sender)>,
55}
56
57pub struct Deterministic {
58 state: Arc<Mutex<DeterministicState>>,
59 parker: Mutex<parking::Parker>,
60}
61
62impl Deterministic {
63 fn new(seed: u64) -> Self {
64 Self {
65 state: Arc::new(Mutex::new(DeterministicState {
66 rng: StdRng::seed_from_u64(seed),
67 seed,
68 scheduled_from_foreground: Default::default(),
69 scheduled_from_background: Default::default(),
70 spawned_from_foreground: Default::default(),
71 forbid_parking: false,
72 block_on_ticks: 0..=1000,
73 now: Instant::now(),
74 pending_timers: Default::default(),
75 })),
76 parker: Default::default(),
77 }
78 }
79
80 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
81 where
82 T: 'static,
83 F: Future<Output = T> + 'static,
84 {
85 let backtrace = Backtrace::new_unresolved();
86 let scheduled_once = AtomicBool::new(false);
87 let state = self.state.clone();
88 let unparker = self.parker.lock().unparker();
89 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
90 let mut state = state.lock();
91 let backtrace = backtrace.clone();
92 if scheduled_once.fetch_or(true, SeqCst) {
93 state.scheduled_from_foreground.push((runnable, backtrace));
94 } else {
95 state.spawned_from_foreground.push((runnable, backtrace));
96 }
97 unparker.unpark();
98 });
99 runnable.schedule();
100 task
101 }
102
103 pub fn spawn<F, T>(&self, future: F) -> Task<T>
104 where
105 T: 'static + Send,
106 F: 'static + Send + Future<Output = T>,
107 {
108 let backtrace = Backtrace::new_unresolved();
109 let state = self.state.clone();
110 let unparker = self.parker.lock().unparker();
111 let (runnable, task) = async_task::spawn(future, move |runnable| {
112 let mut state = state.lock();
113 state
114 .scheduled_from_background
115 .push((runnable, backtrace.clone()));
116 unparker.unpark();
117 });
118 runnable.schedule();
119 task
120 }
121
122 pub fn run<F, T>(&self, future: F) -> T
123 where
124 T: 'static,
125 F: Future<Output = T> + 'static,
126 {
127 let woken = Arc::new(AtomicBool::new(false));
128 let mut future = Box::pin(future);
129 loop {
130 if let Some(result) = self.run_internal(woken.clone(), &mut future) {
131 return result;
132 }
133
134 if !woken.load(SeqCst) && self.state.lock().forbid_parking {
135 panic!("deterministic executor parked after a call to forbid_parking");
136 }
137
138 woken.store(false, SeqCst);
139 self.parker.lock().park();
140 }
141 }
142
143 fn run_until_parked(&self) {
144 let woken = Arc::new(AtomicBool::new(false));
145 let future = std::future::pending::<()>();
146 smol::pin!(future);
147 self.run_internal(woken, future);
148 }
149
150 pub fn run_internal<F, T>(&self, woken: Arc<AtomicBool>, mut future: F) -> Option<T>
151 where
152 T: 'static,
153 F: Future<Output = T> + Unpin,
154 {
155 let unparker = self.parker.lock().unparker();
156 let waker = waker_fn(move || {
157 woken.store(true, SeqCst);
158 unparker.unpark();
159 });
160
161 let mut cx = Context::from_waker(&waker);
162 let mut trace = Trace::default();
163 loop {
164 let mut state = self.state.lock();
165 let runnable_count = state.scheduled_from_foreground.len()
166 + state.scheduled_from_background.len()
167 + state.spawned_from_foreground.len();
168
169 let ix = state.rng.gen_range(0..=runnable_count);
170 if ix < state.scheduled_from_foreground.len() {
171 let (_, backtrace) = &state.scheduled_from_foreground[ix];
172 trace.record(&state, backtrace.clone());
173 let runnable = state.scheduled_from_foreground.remove(ix).0;
174 drop(state);
175 runnable.run();
176 } else if ix - state.scheduled_from_foreground.len()
177 < state.scheduled_from_background.len()
178 {
179 let ix = ix - state.scheduled_from_foreground.len();
180 let (_, backtrace) = &state.scheduled_from_background[ix];
181 trace.record(&state, backtrace.clone());
182 let runnable = state.scheduled_from_background.remove(ix).0;
183 drop(state);
184 runnable.run();
185 } else if ix < runnable_count {
186 let (_, backtrace) = &state.spawned_from_foreground[0];
187 trace.record(&state, backtrace.clone());
188 let runnable = state.spawned_from_foreground.remove(0).0;
189 drop(state);
190 runnable.run();
191 } else {
192 drop(state);
193 if let Poll::Ready(result) = future.poll(&mut cx) {
194 return Some(result);
195 }
196
197 let state = self.state.lock();
198 if state.scheduled_from_foreground.is_empty()
199 && state.scheduled_from_background.is_empty()
200 && state.spawned_from_foreground.is_empty()
201 {
202 return None;
203 }
204 }
205 }
206 }
207
208 pub fn block_on<F, T>(&self, future: F) -> Option<T>
209 where
210 T: 'static,
211 F: Future<Output = T>,
212 {
213 smol::pin!(future);
214
215 let unparker = self.parker.lock().unparker();
216 let waker = waker_fn(move || {
217 unparker.unpark();
218 });
219 let max_ticks = {
220 let mut state = self.state.lock();
221 let range = state.block_on_ticks.clone();
222 state.rng.gen_range(range)
223 };
224
225 let mut cx = Context::from_waker(&waker);
226 let mut trace = Trace::default();
227 for _ in 0..max_ticks {
228 let mut state = self.state.lock();
229 let runnable_count = state.scheduled_from_background.len();
230 let ix = state.rng.gen_range(0..=runnable_count);
231 if ix < state.scheduled_from_background.len() {
232 let (_, backtrace) = &state.scheduled_from_background[ix];
233 trace.record(&state, backtrace.clone());
234 let runnable = state.scheduled_from_background.remove(ix).0;
235 drop(state);
236 runnable.run();
237 } else {
238 drop(state);
239 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
240 return Some(result);
241 }
242 let state = self.state.lock();
243 if state.scheduled_from_background.is_empty() {
244 if state.forbid_parking {
245 panic!("deterministic executor parked after a call to forbid_parking");
246 }
247 drop(state);
248 self.parker.lock().park();
249 }
250
251 continue;
252 }
253 }
254
255 None
256 }
257}
258
259#[derive(Default)]
260struct Trace {
261 executed: Vec<Backtrace>,
262 scheduled: Vec<Vec<Backtrace>>,
263 spawned_from_foreground: Vec<Vec<Backtrace>>,
264}
265
266impl Trace {
267 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
268 self.scheduled.push(
269 state
270 .scheduled_from_foreground
271 .iter()
272 .map(|(_, backtrace)| backtrace.clone())
273 .collect(),
274 );
275 self.spawned_from_foreground.push(
276 state
277 .spawned_from_foreground
278 .iter()
279 .map(|(_, backtrace)| backtrace.clone())
280 .collect(),
281 );
282 self.executed.push(executed);
283 }
284
285 fn resolve(&mut self) {
286 for backtrace in &mut self.executed {
287 backtrace.resolve();
288 }
289
290 for backtraces in &mut self.scheduled {
291 for backtrace in backtraces {
292 backtrace.resolve();
293 }
294 }
295
296 for backtraces in &mut self.spawned_from_foreground {
297 for backtrace in backtraces {
298 backtrace.resolve();
299 }
300 }
301 }
302}
303
304impl Debug for Trace {
305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
307
308 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
310 let cwd = std::env::current_dir().unwrap();
311 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
312 fmt::Display::fmt(&path, fmt)
313 };
314 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
315 for frame in self.0.frames() {
316 let mut formatted_frame = fmt.frame();
317 if frame
318 .symbols()
319 .iter()
320 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
321 {
322 formatted_frame.backtrace_frame(frame)?;
323 break;
324 }
325 }
326 fmt.finish()
327 }
328 }
329
330 for ((backtrace, scheduled), spawned_from_foreground) in self
331 .executed
332 .iter()
333 .zip(&self.scheduled)
334 .zip(&self.spawned_from_foreground)
335 {
336 writeln!(f, "Scheduled")?;
337 for backtrace in scheduled {
338 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
339 }
340 if scheduled.is_empty() {
341 writeln!(f, "None")?;
342 }
343 writeln!(f, "==========")?;
344
345 writeln!(f, "Spawned from foreground")?;
346 for backtrace in spawned_from_foreground {
347 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
348 }
349 if spawned_from_foreground.is_empty() {
350 writeln!(f, "None")?;
351 }
352 writeln!(f, "==========")?;
353
354 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
355 writeln!(f, "+++++++++++++++++++")?;
356 }
357
358 Ok(())
359 }
360}
361
362impl Drop for Trace {
363 fn drop(&mut self) {
364 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
365 trace_on_panic == "1" || trace_on_panic == "true"
366 } else {
367 false
368 };
369 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
370 trace_always == "1" || trace_always == "true"
371 } else {
372 false
373 };
374
375 if trace_always || (trace_on_panic && thread::panicking()) {
376 self.resolve();
377 dbg!(self);
378 }
379 }
380}
381
382impl Foreground {
383 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
384 if dispatcher.is_main_thread() {
385 Ok(Self::Platform {
386 dispatcher,
387 _not_send_or_sync: PhantomData,
388 })
389 } else {
390 Err(anyhow!("must be constructed on main thread"))
391 }
392 }
393
394 pub fn test() -> Self {
395 Self::Test(smol::LocalExecutor::new())
396 }
397
398 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
399 match self {
400 Self::Platform { dispatcher, .. } => {
401 let dispatcher = dispatcher.clone();
402 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
403 let (runnable, task) = async_task::spawn_local(future, schedule);
404 runnable.schedule();
405 task
406 }
407 Self::Test(executor) => executor.spawn(future),
408 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
409 }
410 }
411
412 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
413 match self {
414 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
415 Self::Test(executor) => smol::block_on(executor.run(future)),
416 Self::Deterministic(executor) => executor.run(future),
417 }
418 }
419
420 pub fn forbid_parking(&self) {
421 match self {
422 Self::Deterministic(executor) => {
423 let mut state = executor.state.lock();
424 state.forbid_parking = true;
425 state.rng = StdRng::seed_from_u64(state.seed);
426 }
427 _ => panic!("this method can only be called on a deterministic executor"),
428 }
429 }
430
431 pub async fn timer(&self, duration: Duration) {
432 match self {
433 Self::Deterministic(executor) => {
434 let (tx, mut rx) = barrier::channel();
435 {
436 let mut state = executor.state.lock();
437 let wakeup_at = state.now + duration;
438 state.pending_timers.push((wakeup_at, tx));
439 }
440 rx.recv().await;
441 }
442 _ => {
443 Timer::after(duration).await;
444 }
445 }
446 }
447
448 pub fn advance_clock(&self, duration: Duration) {
449 match self {
450 Self::Deterministic(executor) => {
451 executor.run_until_parked();
452
453 let mut state = executor.state.lock();
454 state.now += duration;
455 let now = state.now;
456 let mut pending_timers = mem::take(&mut state.pending_timers);
457 drop(state);
458
459 pending_timers.retain(|(wakeup, _)| *wakeup > now);
460 executor.state.lock().pending_timers.extend(pending_timers);
461 }
462 _ => panic!("this method can only be called on a deterministic executor"),
463 }
464 }
465
466 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
467 match self {
468 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
469 _ => panic!("this method can only be called on a deterministic executor"),
470 }
471 }
472}
473
474impl Background {
475 pub fn new() -> Self {
476 let executor = Arc::new(Executor::new());
477 let stop = channel::unbounded::<()>();
478
479 for i in 0..2 * num_cpus::get() {
480 let executor = executor.clone();
481 let stop = stop.1.clone();
482 thread::Builder::new()
483 .name(format!("background-executor-{}", i))
484 .spawn(move || smol::block_on(executor.run(stop.recv())))
485 .unwrap();
486 }
487
488 Self::Production {
489 executor,
490 _stop: stop.0,
491 }
492 }
493
494 pub fn num_cpus(&self) -> usize {
495 num_cpus::get()
496 }
497
498 pub fn spawn<T, F>(&self, future: F) -> Task<T>
499 where
500 T: 'static + Send,
501 F: Send + Future<Output = T> + 'static,
502 {
503 match self {
504 Self::Production { executor, .. } => executor.spawn(future),
505 Self::Deterministic(executor) => executor.spawn(future),
506 }
507 }
508
509 pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
510 where
511 T: 'static,
512 F: 'static + Unpin + Future<Output = T>,
513 {
514 if !timeout.is_zero() {
515 let output = match self {
516 Self::Production { .. } => {
517 smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
518 }
519 Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
520 };
521 if let Some(output) = output {
522 return Ok(output);
523 }
524 }
525 Err(future)
526 }
527
528 pub async fn scoped<'scope, F>(&self, scheduler: F)
529 where
530 F: FnOnce(&mut Scope<'scope>),
531 {
532 let mut scope = Scope {
533 futures: Default::default(),
534 _phantom: PhantomData,
535 };
536 (scheduler)(&mut scope);
537 let spawned = scope
538 .futures
539 .into_iter()
540 .map(|f| self.spawn(f))
541 .collect::<Vec<_>>();
542 for task in spawned {
543 task.await;
544 }
545 }
546}
547
548pub struct Scope<'a> {
549 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
550 _phantom: PhantomData<&'a ()>,
551}
552
553impl<'a> Scope<'a> {
554 pub fn spawn<F>(&mut self, f: F)
555 where
556 F: Future<Output = ()> + Send + 'a,
557 {
558 let f = unsafe {
559 mem::transmute::<
560 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
561 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
562 >(Box::pin(f))
563 };
564 self.futures.push(f);
565 }
566}
567
568pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
569 let executor = Arc::new(Deterministic::new(seed));
570 (
571 Rc::new(Foreground::Deterministic(executor.clone())),
572 Arc::new(Background::Deterministic(executor)),
573 )
574}