1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
4use parking_lot::Mutex;
5use postage::{barrier, prelude::Stream as _};
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor, Timer};
8use std::{
9 any::Any,
10 fmt::{self, Debug},
11 marker::PhantomData,
12 mem,
13 ops::RangeInclusive,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc,
19 },
20 task::{Context, Poll},
21 thread,
22 time::{Duration, Instant},
23};
24use waker_fn::waker_fn;
25
26use crate::{
27 platform::{self, Dispatcher},
28 util,
29};
30
31pub enum Foreground {
32 Platform {
33 dispatcher: Arc<dyn platform::Dispatcher>,
34 _not_send_or_sync: PhantomData<Rc<()>>,
35 },
36 Test(smol::LocalExecutor<'static>),
37 Deterministic(Arc<Deterministic>),
38}
39
40pub enum Background {
41 Deterministic {
42 executor: Arc<Deterministic>,
43 },
44 Production {
45 executor: Arc<smol::Executor<'static>>,
46 _stop: channel::Sender<()>,
47 },
48}
49
50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
54
55#[must_use]
56pub enum Task<T> {
57 Local {
58 any_task: AnyLocalTask,
59 result_type: PhantomData<T>,
60 },
61 Send {
62 any_task: AnyTask,
63 result_type: PhantomData<T>,
64 },
65}
66
67unsafe impl<T: Send> Send for Task<T> {}
68
69struct DeterministicState {
70 rng: StdRng,
71 seed: u64,
72 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
73 scheduled_from_background: Vec<(Runnable, Backtrace)>,
74 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
75 forbid_parking: bool,
76 block_on_ticks: RangeInclusive<usize>,
77 now: Instant,
78 pending_timers: Vec<(Instant, barrier::Sender)>,
79}
80
81pub struct Deterministic {
82 state: Arc<Mutex<DeterministicState>>,
83 parker: Mutex<parking::Parker>,
84}
85
86impl Deterministic {
87 fn new(seed: u64) -> Self {
88 Self {
89 state: Arc::new(Mutex::new(DeterministicState {
90 rng: StdRng::seed_from_u64(seed),
91 seed,
92 scheduled_from_foreground: Default::default(),
93 scheduled_from_background: Default::default(),
94 spawned_from_foreground: Default::default(),
95 forbid_parking: false,
96 block_on_ticks: 0..=1000,
97 now: Instant::now(),
98 pending_timers: Default::default(),
99 })),
100 parker: Default::default(),
101 }
102 }
103
104 fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
105 let backtrace = Backtrace::new_unresolved();
106 let scheduled_once = AtomicBool::new(false);
107 let state = self.state.clone();
108 let unparker = self.parker.lock().unparker();
109 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
110 let mut state = state.lock();
111 let backtrace = backtrace.clone();
112 if scheduled_once.fetch_or(true, SeqCst) {
113 state.scheduled_from_foreground.push((runnable, backtrace));
114 } else {
115 state.spawned_from_foreground.push((runnable, backtrace));
116 }
117 unparker.unpark();
118 });
119 runnable.schedule();
120 task
121 }
122
123 fn spawn(&self, future: AnyFuture) -> AnyTask {
124 let backtrace = Backtrace::new_unresolved();
125 let state = self.state.clone();
126 let unparker = self.parker.lock().unparker();
127 let (runnable, task) = async_task::spawn(future, move |runnable| {
128 let mut state = state.lock();
129 state
130 .scheduled_from_background
131 .push((runnable, backtrace.clone()));
132 unparker.unpark();
133 });
134 runnable.schedule();
135 task
136 }
137
138 fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
139 let woken = Arc::new(AtomicBool::new(false));
140 loop {
141 if let Some(result) = self.run_internal(woken.clone(), &mut future) {
142 return result;
143 }
144
145 if !woken.load(SeqCst) && self.state.lock().forbid_parking {
146 panic!("deterministic executor parked after a call to forbid_parking");
147 }
148
149 woken.store(false, SeqCst);
150 self.parker.lock().park();
151 }
152 }
153
154 fn run_until_parked(&self) {
155 let woken = Arc::new(AtomicBool::new(false));
156 let mut future = any_local_future(std::future::pending::<()>());
157 self.run_internal(woken, &mut future);
158 }
159
160 fn run_internal(
161 &self,
162 woken: Arc<AtomicBool>,
163 future: &mut AnyLocalFuture,
164 ) -> Option<Box<dyn Any>> {
165 let unparker = self.parker.lock().unparker();
166 let waker = waker_fn(move || {
167 woken.store(true, SeqCst);
168 unparker.unpark();
169 });
170
171 let mut cx = Context::from_waker(&waker);
172 let mut trace = Trace::default();
173 loop {
174 let mut state = self.state.lock();
175 let runnable_count = state.scheduled_from_foreground.len()
176 + state.scheduled_from_background.len()
177 + state.spawned_from_foreground.len();
178
179 let ix = state.rng.gen_range(0..=runnable_count);
180 if ix < state.scheduled_from_foreground.len() {
181 let (_, backtrace) = &state.scheduled_from_foreground[ix];
182 trace.record(&state, backtrace.clone());
183 let runnable = state.scheduled_from_foreground.remove(ix).0;
184 drop(state);
185 runnable.run();
186 } else if ix - state.scheduled_from_foreground.len()
187 < state.scheduled_from_background.len()
188 {
189 let ix = ix - state.scheduled_from_foreground.len();
190 let (_, backtrace) = &state.scheduled_from_background[ix];
191 trace.record(&state, backtrace.clone());
192 let runnable = state.scheduled_from_background.remove(ix).0;
193 drop(state);
194 runnable.run();
195 } else if ix < runnable_count {
196 let (_, backtrace) = &state.spawned_from_foreground[0];
197 trace.record(&state, backtrace.clone());
198 let runnable = state.spawned_from_foreground.remove(0).0;
199 drop(state);
200 runnable.run();
201 } else {
202 drop(state);
203 if let Poll::Ready(result) = future.poll(&mut cx) {
204 return Some(result);
205 }
206
207 let state = self.state.lock();
208 if state.scheduled_from_foreground.is_empty()
209 && state.scheduled_from_background.is_empty()
210 && state.spawned_from_foreground.is_empty()
211 {
212 return None;
213 }
214 }
215 }
216 }
217
218 fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
219 let unparker = self.parker.lock().unparker();
220 let waker = waker_fn(move || {
221 unparker.unpark();
222 });
223 let max_ticks = {
224 let mut state = self.state.lock();
225 let range = state.block_on_ticks.clone();
226 state.rng.gen_range(range)
227 };
228
229 let mut cx = Context::from_waker(&waker);
230 let mut trace = Trace::default();
231 for _ in 0..max_ticks {
232 let mut state = self.state.lock();
233 let runnable_count = state.scheduled_from_background.len();
234 let ix = state.rng.gen_range(0..=runnable_count);
235 if ix < state.scheduled_from_background.len() {
236 let (_, backtrace) = &state.scheduled_from_background[ix];
237 trace.record(&state, backtrace.clone());
238 let runnable = state.scheduled_from_background.remove(ix).0;
239 drop(state);
240 runnable.run();
241 } else {
242 drop(state);
243 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
244 return Some(result);
245 }
246 let state = self.state.lock();
247 if state.scheduled_from_background.is_empty() {
248 if state.forbid_parking {
249 panic!("deterministic executor parked after a call to forbid_parking");
250 }
251 drop(state);
252 self.parker.lock().park();
253 }
254
255 continue;
256 }
257 }
258
259 None
260 }
261}
262
263#[derive(Default)]
264struct Trace {
265 executed: Vec<Backtrace>,
266 scheduled: Vec<Vec<Backtrace>>,
267 spawned_from_foreground: Vec<Vec<Backtrace>>,
268}
269
270impl Trace {
271 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
272 self.scheduled.push(
273 state
274 .scheduled_from_foreground
275 .iter()
276 .map(|(_, backtrace)| backtrace.clone())
277 .collect(),
278 );
279 self.spawned_from_foreground.push(
280 state
281 .spawned_from_foreground
282 .iter()
283 .map(|(_, backtrace)| backtrace.clone())
284 .collect(),
285 );
286 self.executed.push(executed);
287 }
288
289 fn resolve(&mut self) {
290 for backtrace in &mut self.executed {
291 backtrace.resolve();
292 }
293
294 for backtraces in &mut self.scheduled {
295 for backtrace in backtraces {
296 backtrace.resolve();
297 }
298 }
299
300 for backtraces in &mut self.spawned_from_foreground {
301 for backtrace in backtraces {
302 backtrace.resolve();
303 }
304 }
305 }
306}
307
308impl Debug for Trace {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
311
312 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
313 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
314 let cwd = std::env::current_dir().unwrap();
315 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
316 fmt::Display::fmt(&path, fmt)
317 };
318 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
319 for frame in self.0.frames() {
320 let mut formatted_frame = fmt.frame();
321 if frame
322 .symbols()
323 .iter()
324 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
325 {
326 formatted_frame.backtrace_frame(frame)?;
327 break;
328 }
329 }
330 fmt.finish()
331 }
332 }
333
334 for ((backtrace, scheduled), spawned_from_foreground) in self
335 .executed
336 .iter()
337 .zip(&self.scheduled)
338 .zip(&self.spawned_from_foreground)
339 {
340 writeln!(f, "Scheduled")?;
341 for backtrace in scheduled {
342 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
343 }
344 if scheduled.is_empty() {
345 writeln!(f, "None")?;
346 }
347 writeln!(f, "==========")?;
348
349 writeln!(f, "Spawned from foreground")?;
350 for backtrace in spawned_from_foreground {
351 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
352 }
353 if spawned_from_foreground.is_empty() {
354 writeln!(f, "None")?;
355 }
356 writeln!(f, "==========")?;
357
358 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
359 writeln!(f, "+++++++++++++++++++")?;
360 }
361
362 Ok(())
363 }
364}
365
366impl Drop for Trace {
367 fn drop(&mut self) {
368 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
369 trace_on_panic == "1" || trace_on_panic == "true"
370 } else {
371 false
372 };
373 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
374 trace_always == "1" || trace_always == "true"
375 } else {
376 false
377 };
378
379 if trace_always || (trace_on_panic && thread::panicking()) {
380 self.resolve();
381 dbg!(self);
382 }
383 }
384}
385
386impl Foreground {
387 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
388 if dispatcher.is_main_thread() {
389 Ok(Self::Platform {
390 dispatcher,
391 _not_send_or_sync: PhantomData,
392 })
393 } else {
394 Err(anyhow!("must be constructed on main thread"))
395 }
396 }
397
398 pub fn test() -> Self {
399 Self::Test(smol::LocalExecutor::new())
400 }
401
402 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
403 let future = any_local_future(future);
404 let any_task = match self {
405 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
406 Self::Platform { dispatcher, .. } => {
407 fn spawn_inner(
408 future: AnyLocalFuture,
409 dispatcher: &Arc<dyn Dispatcher>,
410 ) -> AnyLocalTask {
411 let dispatcher = dispatcher.clone();
412 let schedule =
413 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
414 let (runnable, task) = async_task::spawn_local(future, schedule);
415 runnable.schedule();
416 task
417 }
418 spawn_inner(future, dispatcher)
419 }
420 Self::Test(executor) => executor.spawn(future),
421 };
422 Task::local(any_task)
423 }
424
425 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
426 let future = any_local_future(future);
427 let any_value = match self {
428 Self::Deterministic(executor) => executor.run(future),
429 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
430 Self::Test(executor) => smol::block_on(executor.run(future)),
431 };
432 *any_value.downcast().unwrap()
433 }
434
435 pub fn forbid_parking(&self) {
436 match self {
437 Self::Deterministic(executor) => {
438 let mut state = executor.state.lock();
439 state.forbid_parking = true;
440 state.rng = StdRng::seed_from_u64(state.seed);
441 }
442 _ => panic!("this method can only be called on a deterministic executor"),
443 }
444 }
445
446 pub async fn timer(&self, duration: Duration) {
447 match self {
448 Self::Deterministic(executor) => {
449 let (tx, mut rx) = barrier::channel();
450 {
451 let mut state = executor.state.lock();
452 let wakeup_at = state.now + duration;
453 state.pending_timers.push((wakeup_at, tx));
454 }
455 rx.recv().await;
456 }
457 _ => {
458 Timer::after(duration).await;
459 }
460 }
461 }
462
463 pub fn advance_clock(&self, duration: Duration) {
464 match self {
465 Self::Deterministic(executor) => {
466 executor.run_until_parked();
467
468 let mut state = executor.state.lock();
469 state.now += duration;
470 let now = state.now;
471 let mut pending_timers = mem::take(&mut state.pending_timers);
472 drop(state);
473
474 pending_timers.retain(|(wakeup, _)| *wakeup > now);
475 executor.state.lock().pending_timers.extend(pending_timers);
476 }
477 _ => panic!("this method can only be called on a deterministic executor"),
478 }
479 }
480
481 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
482 match self {
483 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
484 _ => panic!("this method can only be called on a deterministic executor"),
485 }
486 }
487}
488
489impl Background {
490 pub fn new() -> Self {
491 let executor = Arc::new(Executor::new());
492 let stop = channel::unbounded::<()>();
493
494 for i in 0..2 * num_cpus::get() {
495 let executor = executor.clone();
496 let stop = stop.1.clone();
497 thread::Builder::new()
498 .name(format!("background-executor-{}", i))
499 .spawn(move || smol::block_on(executor.run(stop.recv())))
500 .unwrap();
501 }
502
503 Self::Production {
504 executor,
505 _stop: stop.0,
506 }
507 }
508
509 pub fn num_cpus(&self) -> usize {
510 num_cpus::get()
511 }
512
513 pub fn spawn<T, F>(&self, future: F) -> Task<T>
514 where
515 T: 'static + Send,
516 F: Send + Future<Output = T> + 'static,
517 {
518 let future = any_future(future);
519 let any_task = match self {
520 Self::Production { executor, .. } => executor.spawn(future),
521 Self::Deterministic { executor, .. } => executor.spawn(future),
522 };
523 Task::send(any_task)
524 }
525
526 pub fn block_with_timeout<F, T>(
527 &self,
528 timeout: Duration,
529 future: F,
530 ) -> Result<T, impl Future<Output = T>>
531 where
532 T: 'static,
533 F: 'static + Unpin + Future<Output = T>,
534 {
535 let mut future = any_local_future(future);
536 if !timeout.is_zero() {
537 let output = match self {
538 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
539 Self::Deterministic { executor, .. } => executor.block_on(&mut future),
540 };
541 if let Some(output) = output {
542 return Ok(*output.downcast().unwrap());
543 }
544 }
545 Err(async { *future.await.downcast().unwrap() })
546 }
547
548 pub async fn scoped<'scope, F>(&self, scheduler: F)
549 where
550 F: FnOnce(&mut Scope<'scope>),
551 {
552 let mut scope = Scope {
553 futures: Default::default(),
554 _phantom: PhantomData,
555 };
556 (scheduler)(&mut scope);
557 let spawned = scope
558 .futures
559 .into_iter()
560 .map(|f| self.spawn(f))
561 .collect::<Vec<_>>();
562 for task in spawned {
563 task.await;
564 }
565 }
566}
567
568pub struct Scope<'a> {
569 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
570 _phantom: PhantomData<&'a ()>,
571}
572
573impl<'a> Scope<'a> {
574 pub fn spawn<F>(&mut self, f: F)
575 where
576 F: Future<Output = ()> + Send + 'a,
577 {
578 let f = unsafe {
579 mem::transmute::<
580 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
581 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
582 >(Box::pin(f))
583 };
584 self.futures.push(f);
585 }
586}
587
588pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
589 let executor = Arc::new(Deterministic::new(seed));
590 (
591 Rc::new(Foreground::Deterministic(executor.clone())),
592 Arc::new(Background::Deterministic { executor }),
593 )
594}
595
596impl<T> Task<T> {
597 fn local(any_task: AnyLocalTask) -> Self {
598 Self::Local {
599 any_task,
600 result_type: PhantomData,
601 }
602 }
603
604 pub fn detach(self) {
605 match self {
606 Task::Local { any_task, .. } => any_task.detach(),
607 Task::Send { any_task, .. } => any_task.detach(),
608 }
609 }
610}
611
612impl<T: Send> Task<T> {
613 fn send(any_task: AnyTask) -> Self {
614 Self::Send {
615 any_task,
616 result_type: PhantomData,
617 }
618 }
619}
620
621impl<T: fmt::Debug> fmt::Debug for Task<T> {
622 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
623 match self {
624 Task::Local { any_task, .. } => any_task.fmt(f),
625 Task::Send { any_task, .. } => any_task.fmt(f),
626 }
627 }
628}
629
630impl<T: 'static> Future for Task<T> {
631 type Output = T;
632
633 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
634 match unsafe { self.get_unchecked_mut() } {
635 Task::Local { any_task, .. } => {
636 any_task.poll(cx).map(|value| *value.downcast().unwrap())
637 }
638 Task::Send { any_task, .. } => {
639 any_task.poll(cx).map(|value| *value.downcast().unwrap())
640 }
641 }
642 }
643}
644
645fn any_future<T, F>(future: F) -> AnyFuture
646where
647 T: 'static + Send,
648 F: Future<Output = T> + Send + 'static,
649{
650 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
651}
652
653fn any_local_future<T, F>(future: F) -> AnyLocalFuture
654where
655 T: 'static,
656 F: Future<Output = T> + 'static,
657{
658 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
659}