1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, prelude::*, Executor, Timer};
9use std::{
10 fmt::{self, Debug},
11 marker::PhantomData,
12 mem,
13 ops::RangeInclusive,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc,
19 },
20 task::{Context, Poll},
21 thread,
22 time::{Duration, Instant},
23};
24use waker_fn::waker_fn;
25
26use crate::{platform, util};
27
28pub enum Foreground {
29 Platform {
30 dispatcher: Arc<dyn platform::Dispatcher>,
31 _not_send_or_sync: PhantomData<Rc<()>>,
32 },
33 Test(smol::LocalExecutor<'static>),
34 Deterministic(Arc<Deterministic>),
35}
36
37pub enum Background {
38 Deterministic(Arc<Deterministic>),
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45struct DeterministicState {
46 rng: StdRng,
47 seed: u64,
48 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
49 scheduled_from_background: Vec<(Runnable, Backtrace)>,
50 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
51 forbid_parking: bool,
52 block_on_ticks: RangeInclusive<usize>,
53 now: Instant,
54 pending_timers: Vec<(Instant, barrier::Sender)>,
55}
56
57pub struct Deterministic {
58 state: Arc<Mutex<DeterministicState>>,
59 parker: Mutex<parking::Parker>,
60}
61
62impl Deterministic {
63 fn new(seed: u64) -> Self {
64 Self {
65 state: Arc::new(Mutex::new(DeterministicState {
66 rng: StdRng::seed_from_u64(seed),
67 seed,
68 scheduled_from_foreground: Default::default(),
69 scheduled_from_background: Default::default(),
70 spawned_from_foreground: Default::default(),
71 forbid_parking: false,
72 block_on_ticks: 0..=1000,
73 now: Instant::now(),
74 pending_timers: Default::default(),
75 })),
76 parker: Default::default(),
77 }
78 }
79
80 fn spawn_from_foreground<T>(&self, future: Pin<Box<dyn Future<Output = T>>>) -> Task<T>
81 where
82 T: 'static,
83 {
84 let backtrace = Backtrace::new_unresolved();
85 let scheduled_once = AtomicBool::new(false);
86 let state = self.state.clone();
87 let unparker = self.parker.lock().unparker();
88 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
89 let mut state = state.lock();
90 let backtrace = backtrace.clone();
91 if scheduled_once.fetch_or(true, SeqCst) {
92 state.scheduled_from_foreground.push((runnable, backtrace));
93 } else {
94 state.spawned_from_foreground.push((runnable, backtrace));
95 }
96 unparker.unpark();
97 });
98 runnable.schedule();
99 task
100 }
101
102 fn spawn<T>(&self, future: Pin<Box<dyn Send + Future<Output = T>>>) -> Task<T>
103 where
104 T: 'static + Send,
105 {
106 let backtrace = Backtrace::new_unresolved();
107 let state = self.state.clone();
108 let unparker = self.parker.lock().unparker();
109 let (runnable, task) = async_task::spawn(future, move |runnable| {
110 let mut state = state.lock();
111 state
112 .scheduled_from_background
113 .push((runnable, backtrace.clone()));
114 unparker.unpark();
115 });
116 runnable.schedule();
117 task
118 }
119
120 fn run<T>(&self, mut future: Pin<Box<dyn Future<Output = T>>>) -> T
121 where
122 T: 'static,
123 {
124 let woken = Arc::new(AtomicBool::new(false));
125 loop {
126 if let Some(result) = self.run_internal(woken.clone(), &mut future) {
127 return result;
128 }
129
130 if !woken.load(SeqCst) && self.state.lock().forbid_parking {
131 panic!("deterministic executor parked after a call to forbid_parking");
132 }
133
134 woken.store(false, SeqCst);
135 self.parker.lock().park();
136 }
137 }
138
139 fn run_until_parked(&self) {
140 let woken = Arc::new(AtomicBool::new(false));
141 let mut future = std::future::pending::<()>().boxed_local();
142 self.run_internal(woken, &mut future);
143 }
144
145 fn run_internal<T>(
146 &self,
147 woken: Arc<AtomicBool>,
148 future: &mut Pin<Box<dyn Future<Output = T>>>,
149 ) -> Option<T>
150 where
151 T: 'static,
152 {
153 let unparker = self.parker.lock().unparker();
154 let waker = waker_fn(move || {
155 woken.store(true, SeqCst);
156 unparker.unpark();
157 });
158
159 let mut cx = Context::from_waker(&waker);
160 let mut trace = Trace::default();
161 loop {
162 let mut state = self.state.lock();
163 let runnable_count = state.scheduled_from_foreground.len()
164 + state.scheduled_from_background.len()
165 + state.spawned_from_foreground.len();
166
167 let ix = state.rng.gen_range(0..=runnable_count);
168 if ix < state.scheduled_from_foreground.len() {
169 let (_, backtrace) = &state.scheduled_from_foreground[ix];
170 trace.record(&state, backtrace.clone());
171 let runnable = state.scheduled_from_foreground.remove(ix).0;
172 drop(state);
173 runnable.run();
174 } else if ix - state.scheduled_from_foreground.len()
175 < state.scheduled_from_background.len()
176 {
177 let ix = ix - state.scheduled_from_foreground.len();
178 let (_, backtrace) = &state.scheduled_from_background[ix];
179 trace.record(&state, backtrace.clone());
180 let runnable = state.scheduled_from_background.remove(ix).0;
181 drop(state);
182 runnable.run();
183 } else if ix < runnable_count {
184 let (_, backtrace) = &state.spawned_from_foreground[0];
185 trace.record(&state, backtrace.clone());
186 let runnable = state.spawned_from_foreground.remove(0).0;
187 drop(state);
188 runnable.run();
189 } else {
190 drop(state);
191 if let Poll::Ready(result) = future.poll(&mut cx) {
192 return Some(result);
193 }
194
195 let state = self.state.lock();
196 if state.scheduled_from_foreground.is_empty()
197 && state.scheduled_from_background.is_empty()
198 && state.spawned_from_foreground.is_empty()
199 {
200 return None;
201 }
202 }
203 }
204 }
205
206 pub fn block_on<F, T>(&self, future: F) -> Option<T>
207 where
208 T: 'static,
209 F: Future<Output = T>,
210 {
211 smol::pin!(future);
212
213 let unparker = self.parker.lock().unparker();
214 let waker = waker_fn(move || {
215 unparker.unpark();
216 });
217 let max_ticks = {
218 let mut state = self.state.lock();
219 let range = state.block_on_ticks.clone();
220 state.rng.gen_range(range)
221 };
222
223 let mut cx = Context::from_waker(&waker);
224 let mut trace = Trace::default();
225 for _ in 0..max_ticks {
226 let mut state = self.state.lock();
227 let runnable_count = state.scheduled_from_background.len();
228 let ix = state.rng.gen_range(0..=runnable_count);
229 if ix < state.scheduled_from_background.len() {
230 let (_, backtrace) = &state.scheduled_from_background[ix];
231 trace.record(&state, backtrace.clone());
232 let runnable = state.scheduled_from_background.remove(ix).0;
233 drop(state);
234 runnable.run();
235 } else {
236 drop(state);
237 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
238 return Some(result);
239 }
240 let state = self.state.lock();
241 if state.scheduled_from_background.is_empty() {
242 if state.forbid_parking {
243 panic!("deterministic executor parked after a call to forbid_parking");
244 }
245 drop(state);
246 self.parker.lock().park();
247 }
248
249 continue;
250 }
251 }
252
253 None
254 }
255}
256
257#[derive(Default)]
258struct Trace {
259 executed: Vec<Backtrace>,
260 scheduled: Vec<Vec<Backtrace>>,
261 spawned_from_foreground: Vec<Vec<Backtrace>>,
262}
263
264impl Trace {
265 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
266 self.scheduled.push(
267 state
268 .scheduled_from_foreground
269 .iter()
270 .map(|(_, backtrace)| backtrace.clone())
271 .collect(),
272 );
273 self.spawned_from_foreground.push(
274 state
275 .spawned_from_foreground
276 .iter()
277 .map(|(_, backtrace)| backtrace.clone())
278 .collect(),
279 );
280 self.executed.push(executed);
281 }
282
283 fn resolve(&mut self) {
284 for backtrace in &mut self.executed {
285 backtrace.resolve();
286 }
287
288 for backtraces in &mut self.scheduled {
289 for backtrace in backtraces {
290 backtrace.resolve();
291 }
292 }
293
294 for backtraces in &mut self.spawned_from_foreground {
295 for backtrace in backtraces {
296 backtrace.resolve();
297 }
298 }
299 }
300}
301
302impl Debug for Trace {
303 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
305
306 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
308 let cwd = std::env::current_dir().unwrap();
309 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
310 fmt::Display::fmt(&path, fmt)
311 };
312 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
313 for frame in self.0.frames() {
314 let mut formatted_frame = fmt.frame();
315 if frame
316 .symbols()
317 .iter()
318 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
319 {
320 formatted_frame.backtrace_frame(frame)?;
321 break;
322 }
323 }
324 fmt.finish()
325 }
326 }
327
328 for ((backtrace, scheduled), spawned_from_foreground) in self
329 .executed
330 .iter()
331 .zip(&self.scheduled)
332 .zip(&self.spawned_from_foreground)
333 {
334 writeln!(f, "Scheduled")?;
335 for backtrace in scheduled {
336 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
337 }
338 if scheduled.is_empty() {
339 writeln!(f, "None")?;
340 }
341 writeln!(f, "==========")?;
342
343 writeln!(f, "Spawned from foreground")?;
344 for backtrace in spawned_from_foreground {
345 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
346 }
347 if spawned_from_foreground.is_empty() {
348 writeln!(f, "None")?;
349 }
350 writeln!(f, "==========")?;
351
352 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
353 writeln!(f, "+++++++++++++++++++")?;
354 }
355
356 Ok(())
357 }
358}
359
360impl Drop for Trace {
361 fn drop(&mut self) {
362 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
363 trace_on_panic == "1" || trace_on_panic == "true"
364 } else {
365 false
366 };
367 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
368 trace_always == "1" || trace_always == "true"
369 } else {
370 false
371 };
372
373 if trace_always || (trace_on_panic && thread::panicking()) {
374 self.resolve();
375 dbg!(self);
376 }
377 }
378}
379
380impl Foreground {
381 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
382 if dispatcher.is_main_thread() {
383 Ok(Self::Platform {
384 dispatcher,
385 _not_send_or_sync: PhantomData,
386 })
387 } else {
388 Err(anyhow!("must be constructed on main thread"))
389 }
390 }
391
392 pub fn test() -> Self {
393 Self::Test(smol::LocalExecutor::new())
394 }
395
396 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
397 let future = future.boxed_local();
398 match self {
399 Self::Platform { dispatcher, .. } => {
400 let dispatcher = dispatcher.clone();
401 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
402 let (runnable, task) = async_task::spawn_local(future, schedule);
403 runnable.schedule();
404 task
405 }
406 Self::Test(executor) => executor.spawn(future),
407 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
408 }
409 }
410
411 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
412 let future = future.boxed_local();
413 match self {
414 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
415 Self::Test(executor) => smol::block_on(executor.run(future)),
416 Self::Deterministic(executor) => executor.run(future),
417 }
418 }
419
420 pub fn forbid_parking(&self) {
421 match self {
422 Self::Deterministic(executor) => {
423 let mut state = executor.state.lock();
424 state.forbid_parking = true;
425 state.rng = StdRng::seed_from_u64(state.seed);
426 }
427 _ => panic!("this method can only be called on a deterministic executor"),
428 }
429 }
430
431 pub async fn timer(&self, duration: Duration) {
432 match self {
433 Self::Deterministic(executor) => {
434 let (tx, mut rx) = barrier::channel();
435 {
436 let mut state = executor.state.lock();
437 let wakeup_at = state.now + duration;
438 state.pending_timers.push((wakeup_at, tx));
439 }
440 rx.recv().await;
441 }
442 _ => {
443 Timer::after(duration).await;
444 }
445 }
446 }
447
448 pub fn advance_clock(&self, duration: Duration) {
449 match self {
450 Self::Deterministic(executor) => {
451 executor.run_until_parked();
452
453 let mut state = executor.state.lock();
454 state.now += duration;
455 let now = state.now;
456 let mut pending_timers = mem::take(&mut state.pending_timers);
457 drop(state);
458
459 pending_timers.retain(|(wakeup, _)| *wakeup > now);
460 executor.state.lock().pending_timers.extend(pending_timers);
461 }
462 _ => panic!("this method can only be called on a deterministic executor"),
463 }
464 }
465
466 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
467 match self {
468 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
469 _ => panic!("this method can only be called on a deterministic executor"),
470 }
471 }
472}
473
474impl Background {
475 pub fn new() -> Self {
476 let executor = Arc::new(Executor::new());
477 let stop = channel::unbounded::<()>();
478
479 for i in 0..2 * num_cpus::get() {
480 let executor = executor.clone();
481 let stop = stop.1.clone();
482 thread::Builder::new()
483 .name(format!("background-executor-{}", i))
484 .spawn(move || smol::block_on(executor.run(stop.recv())))
485 .unwrap();
486 }
487
488 Self::Production {
489 executor,
490 _stop: stop.0,
491 }
492 }
493
494 pub fn num_cpus(&self) -> usize {
495 num_cpus::get()
496 }
497
498 pub fn spawn<T, F>(&self, future: F) -> Task<T>
499 where
500 T: 'static + Send,
501 F: Send + Future<Output = T> + 'static,
502 {
503 let future = future.boxed();
504 match self {
505 Self::Production { executor, .. } => executor.spawn(future),
506 Self::Deterministic(executor) => executor.spawn(future),
507 }
508 }
509
510 pub fn block_with_timeout<F, T>(
511 &self,
512 timeout: Duration,
513 future: F,
514 ) -> Result<T, Pin<Box<dyn Future<Output = T>>>>
515 where
516 T: 'static,
517 F: 'static + Unpin + Future<Output = T>,
518 {
519 let mut future = future.boxed_local();
520 if !timeout.is_zero() {
521 let output = match self {
522 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
523 Self::Deterministic(executor) => executor.block_on(&mut future),
524 };
525 if let Some(output) = output {
526 return Ok(output);
527 }
528 }
529 Err(future)
530 }
531
532 pub async fn scoped<'scope, F>(&self, scheduler: F)
533 where
534 F: FnOnce(&mut Scope<'scope>),
535 {
536 let mut scope = Scope {
537 futures: Default::default(),
538 _phantom: PhantomData,
539 };
540 (scheduler)(&mut scope);
541 let spawned = scope
542 .futures
543 .into_iter()
544 .map(|f| self.spawn(f))
545 .collect::<Vec<_>>();
546 for task in spawned {
547 task.await;
548 }
549 }
550}
551
552pub struct Scope<'a> {
553 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
554 _phantom: PhantomData<&'a ()>,
555}
556
557impl<'a> Scope<'a> {
558 pub fn spawn<F>(&mut self, f: F)
559 where
560 F: Future<Output = ()> + Send + 'a,
561 {
562 let f = unsafe {
563 mem::transmute::<
564 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
565 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
566 >(Box::pin(f))
567 };
568 self.futures.push(f);
569 }
570}
571
572pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
573 let executor = Arc::new(Deterministic::new(seed));
574 (
575 Rc::new(Foreground::Deterministic(executor.clone())),
576 Arc::new(Background::Deterministic(executor)),
577 )
578}