1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
4use parking_lot::Mutex;
5use postage::{barrier, prelude::Stream as _};
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor, Timer};
8use std::{
9 any::Any,
10 fmt::{self, Debug},
11 marker::PhantomData,
12 mem,
13 ops::RangeInclusive,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc,
19 },
20 task::{Context, Poll},
21 thread,
22 time::{Duration, Instant},
23};
24use waker_fn::waker_fn;
25
26use crate::{
27 platform::{self, Dispatcher},
28 util,
29};
30
31pub enum Foreground {
32 Platform {
33 dispatcher: Arc<dyn platform::Dispatcher>,
34 _not_send_or_sync: PhantomData<Rc<()>>,
35 },
36 Test(smol::LocalExecutor<'static>),
37 Deterministic(Arc<Deterministic>),
38}
39
40pub enum Background {
41 Deterministic {
42 executor: Arc<Deterministic>,
43 },
44 Production {
45 executor: Arc<smol::Executor<'static>>,
46 _stop: channel::Sender<()>,
47 },
48}
49
50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
54
55#[must_use]
56pub enum Task<T> {
57 Ready(Option<T>),
58 Local {
59 any_task: AnyLocalTask,
60 result_type: PhantomData<T>,
61 },
62 Send {
63 any_task: AnyTask,
64 result_type: PhantomData<T>,
65 },
66}
67
68unsafe impl<T: Send> Send for Task<T> {}
69
70struct DeterministicState {
71 rng: StdRng,
72 seed: u64,
73 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
74 scheduled_from_background: Vec<(Runnable, Backtrace)>,
75 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
76 forbid_parking: bool,
77 block_on_ticks: RangeInclusive<usize>,
78 now: Instant,
79 pending_timers: Vec<(Instant, barrier::Sender)>,
80}
81
82pub struct Deterministic {
83 state: Arc<Mutex<DeterministicState>>,
84 parker: Mutex<parking::Parker>,
85}
86
87impl Deterministic {
88 fn new(seed: u64) -> Self {
89 Self {
90 state: Arc::new(Mutex::new(DeterministicState {
91 rng: StdRng::seed_from_u64(seed),
92 seed,
93 scheduled_from_foreground: Default::default(),
94 scheduled_from_background: Default::default(),
95 spawned_from_foreground: Default::default(),
96 forbid_parking: false,
97 block_on_ticks: 0..=1000,
98 now: Instant::now(),
99 pending_timers: Default::default(),
100 })),
101 parker: Default::default(),
102 }
103 }
104
105 fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
106 let backtrace = Backtrace::new_unresolved();
107 let scheduled_once = AtomicBool::new(false);
108 let state = self.state.clone();
109 let unparker = self.parker.lock().unparker();
110 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
111 let mut state = state.lock();
112 let backtrace = backtrace.clone();
113 if scheduled_once.fetch_or(true, SeqCst) {
114 state.scheduled_from_foreground.push((runnable, backtrace));
115 } else {
116 state.spawned_from_foreground.push((runnable, backtrace));
117 }
118 unparker.unpark();
119 });
120 runnable.schedule();
121 task
122 }
123
124 fn spawn(&self, future: AnyFuture) -> AnyTask {
125 let backtrace = Backtrace::new_unresolved();
126 let state = self.state.clone();
127 let unparker = self.parker.lock().unparker();
128 let (runnable, task) = async_task::spawn(future, move |runnable| {
129 let mut state = state.lock();
130 state
131 .scheduled_from_background
132 .push((runnable, backtrace.clone()));
133 unparker.unpark();
134 });
135 runnable.schedule();
136 task
137 }
138
139 fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
140 let woken = Arc::new(AtomicBool::new(false));
141 loop {
142 if let Some(result) = self.run_internal(woken.clone(), &mut future) {
143 return result;
144 }
145
146 if !woken.load(SeqCst) && self.state.lock().forbid_parking {
147 panic!("deterministic executor parked after a call to forbid_parking");
148 }
149
150 woken.store(false, SeqCst);
151 self.parker.lock().park();
152 }
153 }
154
155 fn run_until_parked(&self) {
156 let woken = Arc::new(AtomicBool::new(false));
157 let mut future = any_local_future(std::future::pending::<()>());
158 self.run_internal(woken, &mut future);
159 }
160
161 fn run_internal(
162 &self,
163 woken: Arc<AtomicBool>,
164 future: &mut AnyLocalFuture,
165 ) -> Option<Box<dyn Any>> {
166 let unparker = self.parker.lock().unparker();
167 let waker = waker_fn(move || {
168 woken.store(true, SeqCst);
169 unparker.unpark();
170 });
171
172 let mut cx = Context::from_waker(&waker);
173 let mut trace = Trace::default();
174 loop {
175 let mut state = self.state.lock();
176 let runnable_count = state.scheduled_from_foreground.len()
177 + state.scheduled_from_background.len()
178 + state.spawned_from_foreground.len();
179
180 let ix = state.rng.gen_range(0..=runnable_count);
181 if ix < state.scheduled_from_foreground.len() {
182 let (_, backtrace) = &state.scheduled_from_foreground[ix];
183 trace.record(&state, backtrace.clone());
184 let runnable = state.scheduled_from_foreground.remove(ix).0;
185 drop(state);
186 runnable.run();
187 } else if ix - state.scheduled_from_foreground.len()
188 < state.scheduled_from_background.len()
189 {
190 let ix = ix - state.scheduled_from_foreground.len();
191 let (_, backtrace) = &state.scheduled_from_background[ix];
192 trace.record(&state, backtrace.clone());
193 let runnable = state.scheduled_from_background.remove(ix).0;
194 drop(state);
195 runnable.run();
196 } else if ix < runnable_count {
197 let (_, backtrace) = &state.spawned_from_foreground[0];
198 trace.record(&state, backtrace.clone());
199 let runnable = state.spawned_from_foreground.remove(0).0;
200 drop(state);
201 runnable.run();
202 } else {
203 drop(state);
204 if let Poll::Ready(result) = future.poll(&mut cx) {
205 return Some(result);
206 }
207
208 let state = self.state.lock();
209 if state.would_park() {
210 return None;
211 }
212 }
213 }
214 }
215
216 fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
217 let unparker = self.parker.lock().unparker();
218 let waker = waker_fn(move || {
219 unparker.unpark();
220 });
221 let max_ticks = {
222 let mut state = self.state.lock();
223 let range = state.block_on_ticks.clone();
224 state.rng.gen_range(range)
225 };
226
227 let mut cx = Context::from_waker(&waker);
228 let mut trace = Trace::default();
229 for _ in 0..max_ticks {
230 let mut state = self.state.lock();
231 let runnable_count = state.scheduled_from_background.len();
232 let ix = state.rng.gen_range(0..=runnable_count);
233 if ix < state.scheduled_from_background.len() {
234 let (_, backtrace) = &state.scheduled_from_background[ix];
235 trace.record(&state, backtrace.clone());
236 let runnable = state.scheduled_from_background.remove(ix).0;
237 drop(state);
238 runnable.run();
239 } else {
240 drop(state);
241 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
242 return Some(result);
243 }
244 let state = self.state.lock();
245 if state.scheduled_from_background.is_empty() {
246 if state.forbid_parking {
247 panic!("deterministic executor parked after a call to forbid_parking");
248 }
249 drop(state);
250 self.parker.lock().park();
251 }
252
253 continue;
254 }
255 }
256
257 None
258 }
259}
260
261impl DeterministicState {
262 fn would_park(&self) -> bool {
263 self.forbid_parking
264 && self.scheduled_from_foreground.is_empty()
265 && self.scheduled_from_background.is_empty()
266 && self.spawned_from_foreground.is_empty()
267 }
268}
269
270#[derive(Default)]
271struct Trace {
272 executed: Vec<Backtrace>,
273 scheduled: Vec<Vec<Backtrace>>,
274 spawned_from_foreground: Vec<Vec<Backtrace>>,
275}
276
277impl Trace {
278 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
279 self.scheduled.push(
280 state
281 .scheduled_from_foreground
282 .iter()
283 .map(|(_, backtrace)| backtrace.clone())
284 .collect(),
285 );
286 self.spawned_from_foreground.push(
287 state
288 .spawned_from_foreground
289 .iter()
290 .map(|(_, backtrace)| backtrace.clone())
291 .collect(),
292 );
293 self.executed.push(executed);
294 }
295
296 fn resolve(&mut self) {
297 for backtrace in &mut self.executed {
298 backtrace.resolve();
299 }
300
301 for backtraces in &mut self.scheduled {
302 for backtrace in backtraces {
303 backtrace.resolve();
304 }
305 }
306
307 for backtraces in &mut self.spawned_from_foreground {
308 for backtrace in backtraces {
309 backtrace.resolve();
310 }
311 }
312 }
313}
314
315impl Debug for Trace {
316 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
318
319 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
321 let cwd = std::env::current_dir().unwrap();
322 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
323 fmt::Display::fmt(&path, fmt)
324 };
325 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
326 for frame in self.0.frames() {
327 let mut formatted_frame = fmt.frame();
328 if frame
329 .symbols()
330 .iter()
331 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
332 {
333 formatted_frame.backtrace_frame(frame)?;
334 break;
335 }
336 }
337 fmt.finish()
338 }
339 }
340
341 for ((backtrace, scheduled), spawned_from_foreground) in self
342 .executed
343 .iter()
344 .zip(&self.scheduled)
345 .zip(&self.spawned_from_foreground)
346 {
347 writeln!(f, "Scheduled")?;
348 for backtrace in scheduled {
349 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
350 }
351 if scheduled.is_empty() {
352 writeln!(f, "None")?;
353 }
354 writeln!(f, "==========")?;
355
356 writeln!(f, "Spawned from foreground")?;
357 for backtrace in spawned_from_foreground {
358 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
359 }
360 if spawned_from_foreground.is_empty() {
361 writeln!(f, "None")?;
362 }
363 writeln!(f, "==========")?;
364
365 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
366 writeln!(f, "+++++++++++++++++++")?;
367 }
368
369 Ok(())
370 }
371}
372
373impl Drop for Trace {
374 fn drop(&mut self) {
375 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
376 trace_on_panic == "1" || trace_on_panic == "true"
377 } else {
378 false
379 };
380 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
381 trace_always == "1" || trace_always == "true"
382 } else {
383 false
384 };
385
386 if trace_always || (trace_on_panic && thread::panicking()) {
387 self.resolve();
388 dbg!(self);
389 }
390 }
391}
392
393impl Foreground {
394 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
395 if dispatcher.is_main_thread() {
396 Ok(Self::Platform {
397 dispatcher,
398 _not_send_or_sync: PhantomData,
399 })
400 } else {
401 Err(anyhow!("must be constructed on main thread"))
402 }
403 }
404
405 pub fn test() -> Self {
406 Self::Test(smol::LocalExecutor::new())
407 }
408
409 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
410 let future = any_local_future(future);
411 let any_task = match self {
412 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
413 Self::Platform { dispatcher, .. } => {
414 fn spawn_inner(
415 future: AnyLocalFuture,
416 dispatcher: &Arc<dyn Dispatcher>,
417 ) -> AnyLocalTask {
418 let dispatcher = dispatcher.clone();
419 let schedule =
420 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
421 let (runnable, task) = async_task::spawn_local(future, schedule);
422 runnable.schedule();
423 task
424 }
425 spawn_inner(future, dispatcher)
426 }
427 Self::Test(executor) => executor.spawn(future),
428 };
429 Task::local(any_task)
430 }
431
432 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
433 let future = any_local_future(future);
434 let any_value = match self {
435 Self::Deterministic(executor) => executor.run(future),
436 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
437 Self::Test(executor) => smol::block_on(executor.run(future)),
438 };
439 *any_value.downcast().unwrap()
440 }
441
442 pub fn parking_forbidden(&self) -> bool {
443 match self {
444 Self::Deterministic(executor) => executor.state.lock().forbid_parking,
445 _ => panic!("this method can only be called on a deterministic executor"),
446 }
447 }
448
449 pub fn would_park(&self) -> bool {
450 match self {
451 Self::Deterministic(executor) => executor.state.lock().would_park(),
452 _ => panic!("this method can only be called on a deterministic executor"),
453 }
454 }
455
456 pub fn forbid_parking(&self) {
457 match self {
458 Self::Deterministic(executor) => {
459 let mut state = executor.state.lock();
460 state.forbid_parking = true;
461 state.rng = StdRng::seed_from_u64(state.seed);
462 }
463 _ => panic!("this method can only be called on a deterministic executor"),
464 }
465 }
466
467 pub async fn timer(&self, duration: Duration) {
468 match self {
469 Self::Deterministic(executor) => {
470 let (tx, mut rx) = barrier::channel();
471 {
472 let mut state = executor.state.lock();
473 let wakeup_at = state.now + duration;
474 state.pending_timers.push((wakeup_at, tx));
475 }
476 rx.recv().await;
477 }
478 _ => {
479 Timer::after(duration).await;
480 }
481 }
482 }
483
484 pub fn advance_clock(&self, duration: Duration) {
485 match self {
486 Self::Deterministic(executor) => {
487 executor.run_until_parked();
488
489 let mut state = executor.state.lock();
490 state.now += duration;
491 let now = state.now;
492 let mut pending_timers = mem::take(&mut state.pending_timers);
493 drop(state);
494
495 pending_timers.retain(|(wakeup, _)| *wakeup > now);
496 executor.state.lock().pending_timers.extend(pending_timers);
497 }
498 _ => panic!("this method can only be called on a deterministic executor"),
499 }
500 }
501
502 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
503 match self {
504 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
505 _ => panic!("this method can only be called on a deterministic executor"),
506 }
507 }
508}
509
510impl Background {
511 pub fn new() -> Self {
512 let executor = Arc::new(Executor::new());
513 let stop = channel::unbounded::<()>();
514
515 for i in 0..2 * num_cpus::get() {
516 let executor = executor.clone();
517 let stop = stop.1.clone();
518 thread::Builder::new()
519 .name(format!("background-executor-{}", i))
520 .spawn(move || smol::block_on(executor.run(stop.recv())))
521 .unwrap();
522 }
523
524 Self::Production {
525 executor,
526 _stop: stop.0,
527 }
528 }
529
530 pub fn num_cpus(&self) -> usize {
531 num_cpus::get()
532 }
533
534 pub fn spawn<T, F>(&self, future: F) -> Task<T>
535 where
536 T: 'static + Send,
537 F: Send + Future<Output = T> + 'static,
538 {
539 let future = any_future(future);
540 let any_task = match self {
541 Self::Production { executor, .. } => executor.spawn(future),
542 Self::Deterministic { executor, .. } => executor.spawn(future),
543 };
544 Task::send(any_task)
545 }
546
547 pub fn block_with_timeout<F, T>(
548 &self,
549 timeout: Duration,
550 future: F,
551 ) -> Result<T, impl Future<Output = T>>
552 where
553 T: 'static,
554 F: 'static + Unpin + Future<Output = T>,
555 {
556 let mut future = any_local_future(future);
557 if !timeout.is_zero() {
558 let output = match self {
559 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
560 Self::Deterministic { executor, .. } => executor.block_on(&mut future),
561 };
562 if let Some(output) = output {
563 return Ok(*output.downcast().unwrap());
564 }
565 }
566 Err(async { *future.await.downcast().unwrap() })
567 }
568
569 pub async fn scoped<'scope, F>(&self, scheduler: F)
570 where
571 F: FnOnce(&mut Scope<'scope>),
572 {
573 let mut scope = Scope {
574 futures: Default::default(),
575 _phantom: PhantomData,
576 };
577 (scheduler)(&mut scope);
578 let spawned = scope
579 .futures
580 .into_iter()
581 .map(|f| self.spawn(f))
582 .collect::<Vec<_>>();
583 for task in spawned {
584 task.await;
585 }
586 }
587}
588
589pub struct Scope<'a> {
590 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
591 _phantom: PhantomData<&'a ()>,
592}
593
594impl<'a> Scope<'a> {
595 pub fn spawn<F>(&mut self, f: F)
596 where
597 F: Future<Output = ()> + Send + 'a,
598 {
599 let f = unsafe {
600 mem::transmute::<
601 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
602 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
603 >(Box::pin(f))
604 };
605 self.futures.push(f);
606 }
607}
608
609pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
610 let executor = Arc::new(Deterministic::new(seed));
611 (
612 Rc::new(Foreground::Deterministic(executor.clone())),
613 Arc::new(Background::Deterministic { executor }),
614 )
615}
616
617impl<T> Task<T> {
618 pub fn ready(value: T) -> Self {
619 Self::Ready(Some(value))
620 }
621
622 fn local(any_task: AnyLocalTask) -> Self {
623 Self::Local {
624 any_task,
625 result_type: PhantomData,
626 }
627 }
628
629 pub fn detach(self) {
630 match self {
631 Task::Ready(_) => {}
632 Task::Local { any_task, .. } => any_task.detach(),
633 Task::Send { any_task, .. } => any_task.detach(),
634 }
635 }
636}
637
638impl<T: Send> Task<T> {
639 fn send(any_task: AnyTask) -> Self {
640 Self::Send {
641 any_task,
642 result_type: PhantomData,
643 }
644 }
645}
646
647impl<T: fmt::Debug> fmt::Debug for Task<T> {
648 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
649 match self {
650 Task::Ready(value) => value.fmt(f),
651 Task::Local { any_task, .. } => any_task.fmt(f),
652 Task::Send { any_task, .. } => any_task.fmt(f),
653 }
654 }
655}
656
657impl<T: 'static> Future for Task<T> {
658 type Output = T;
659
660 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
661 match unsafe { self.get_unchecked_mut() } {
662 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
663 Task::Local { any_task, .. } => {
664 any_task.poll(cx).map(|value| *value.downcast().unwrap())
665 }
666 Task::Send { any_task, .. } => {
667 any_task.poll(cx).map(|value| *value.downcast().unwrap())
668 }
669 }
670 }
671}
672
673fn any_future<T, F>(future: F) -> AnyFuture
674where
675 T: 'static + Send,
676 F: Future<Output = T> + Send + 'static,
677{
678 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
679}
680
681fn any_local_future<T, F>(future: F) -> AnyLocalFuture
682where
683 T: 'static,
684 F: Future<Output = T> + 'static,
685{
686 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
687}