executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  4use parking_lot::Mutex;
  5use postage::{barrier, prelude::Stream as _};
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor, Timer};
  8use std::{
  9    any::Any,
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{
 27    platform::{self, Dispatcher},
 28    util,
 29};
 30
 31pub enum Foreground {
 32    Platform {
 33        dispatcher: Arc<dyn platform::Dispatcher>,
 34        _not_send_or_sync: PhantomData<Rc<()>>,
 35    },
 36    Test(smol::LocalExecutor<'static>),
 37    Deterministic(Arc<Deterministic>),
 38}
 39
 40pub enum Background {
 41    Deterministic {
 42        executor: Arc<Deterministic>,
 43    },
 44    Production {
 45        executor: Arc<smol::Executor<'static>>,
 46        _stop: channel::Sender<()>,
 47    },
 48}
 49
 50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 54
 55#[must_use]
 56pub enum Task<T> {
 57    Ready(Option<T>),
 58    Local {
 59        any_task: AnyLocalTask,
 60        result_type: PhantomData<T>,
 61    },
 62    Send {
 63        any_task: AnyTask,
 64        result_type: PhantomData<T>,
 65    },
 66}
 67
 68unsafe impl<T: Send> Send for Task<T> {}
 69
 70struct DeterministicState {
 71    rng: StdRng,
 72    seed: u64,
 73    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 74    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 75    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 76    forbid_parking: bool,
 77    block_on_ticks: RangeInclusive<usize>,
 78    now: Instant,
 79    pending_timers: Vec<(Instant, barrier::Sender)>,
 80}
 81
 82pub struct Deterministic {
 83    state: Arc<Mutex<DeterministicState>>,
 84    parker: Mutex<parking::Parker>,
 85}
 86
 87impl Deterministic {
 88    fn new(seed: u64) -> Self {
 89        Self {
 90            state: Arc::new(Mutex::new(DeterministicState {
 91                rng: StdRng::seed_from_u64(seed),
 92                seed,
 93                scheduled_from_foreground: Default::default(),
 94                scheduled_from_background: Default::default(),
 95                spawned_from_foreground: Default::default(),
 96                forbid_parking: false,
 97                block_on_ticks: 0..=1000,
 98                now: Instant::now(),
 99                pending_timers: Default::default(),
100            })),
101            parker: Default::default(),
102        }
103    }
104
105    fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
106        let backtrace = Backtrace::new_unresolved();
107        let scheduled_once = AtomicBool::new(false);
108        let state = self.state.clone();
109        let unparker = self.parker.lock().unparker();
110        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
111            let mut state = state.lock();
112            let backtrace = backtrace.clone();
113            if scheduled_once.fetch_or(true, SeqCst) {
114                state.scheduled_from_foreground.push((runnable, backtrace));
115            } else {
116                state.spawned_from_foreground.push((runnable, backtrace));
117            }
118            unparker.unpark();
119        });
120        runnable.schedule();
121        task
122    }
123
124    fn spawn(&self, future: AnyFuture) -> AnyTask {
125        let backtrace = Backtrace::new_unresolved();
126        let state = self.state.clone();
127        let unparker = self.parker.lock().unparker();
128        let (runnable, task) = async_task::spawn(future, move |runnable| {
129            let mut state = state.lock();
130            state
131                .scheduled_from_background
132                .push((runnable, backtrace.clone()));
133            unparker.unpark();
134        });
135        runnable.schedule();
136        task
137    }
138
139    fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
140        let woken = Arc::new(AtomicBool::new(false));
141        loop {
142            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
143                return result;
144            }
145
146            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
147                panic!("deterministic executor parked after a call to forbid_parking");
148            }
149
150            woken.store(false, SeqCst);
151            self.parker.lock().park();
152        }
153    }
154
155    fn run_until_parked(&self) {
156        let woken = Arc::new(AtomicBool::new(false));
157        let mut future = any_local_future(std::future::pending::<()>());
158        self.run_internal(woken, &mut future);
159    }
160
161    fn run_internal(
162        &self,
163        woken: Arc<AtomicBool>,
164        future: &mut AnyLocalFuture,
165    ) -> Option<Box<dyn Any>> {
166        let unparker = self.parker.lock().unparker();
167        let waker = waker_fn(move || {
168            woken.store(true, SeqCst);
169            unparker.unpark();
170        });
171
172        let mut cx = Context::from_waker(&waker);
173        let mut trace = Trace::default();
174        loop {
175            let mut state = self.state.lock();
176            let runnable_count = state.scheduled_from_foreground.len()
177                + state.scheduled_from_background.len()
178                + state.spawned_from_foreground.len();
179
180            let ix = state.rng.gen_range(0..=runnable_count);
181            if ix < state.scheduled_from_foreground.len() {
182                let (_, backtrace) = &state.scheduled_from_foreground[ix];
183                trace.record(&state, backtrace.clone());
184                let runnable = state.scheduled_from_foreground.remove(ix).0;
185                drop(state);
186                runnable.run();
187            } else if ix - state.scheduled_from_foreground.len()
188                < state.scheduled_from_background.len()
189            {
190                let ix = ix - state.scheduled_from_foreground.len();
191                let (_, backtrace) = &state.scheduled_from_background[ix];
192                trace.record(&state, backtrace.clone());
193                let runnable = state.scheduled_from_background.remove(ix).0;
194                drop(state);
195                runnable.run();
196            } else if ix < runnable_count {
197                let (_, backtrace) = &state.spawned_from_foreground[0];
198                trace.record(&state, backtrace.clone());
199                let runnable = state.spawned_from_foreground.remove(0).0;
200                drop(state);
201                runnable.run();
202            } else {
203                drop(state);
204                if let Poll::Ready(result) = future.poll(&mut cx) {
205                    return Some(result);
206                }
207
208                let state = self.state.lock();
209                if state.would_park() {
210                    return None;
211                }
212            }
213        }
214    }
215
216    fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
217        let unparker = self.parker.lock().unparker();
218        let waker = waker_fn(move || {
219            unparker.unpark();
220        });
221        let max_ticks = {
222            let mut state = self.state.lock();
223            let range = state.block_on_ticks.clone();
224            state.rng.gen_range(range)
225        };
226
227        let mut cx = Context::from_waker(&waker);
228        let mut trace = Trace::default();
229        for _ in 0..max_ticks {
230            let mut state = self.state.lock();
231            let runnable_count = state.scheduled_from_background.len();
232            let ix = state.rng.gen_range(0..=runnable_count);
233            if ix < state.scheduled_from_background.len() {
234                let (_, backtrace) = &state.scheduled_from_background[ix];
235                trace.record(&state, backtrace.clone());
236                let runnable = state.scheduled_from_background.remove(ix).0;
237                drop(state);
238                runnable.run();
239            } else {
240                drop(state);
241                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
242                    return Some(result);
243                }
244                let state = self.state.lock();
245                if state.scheduled_from_background.is_empty() {
246                    if state.forbid_parking {
247                        panic!("deterministic executor parked after a call to forbid_parking");
248                    }
249                    drop(state);
250                    self.parker.lock().park();
251                }
252
253                continue;
254            }
255        }
256
257        None
258    }
259}
260
261impl DeterministicState {
262    fn would_park(&self) -> bool {
263        self.scheduled_from_foreground.is_empty()
264            && self.scheduled_from_background.is_empty()
265            && self.spawned_from_foreground.is_empty()
266    }
267}
268
269#[derive(Default)]
270struct Trace {
271    executed: Vec<Backtrace>,
272    scheduled: Vec<Vec<Backtrace>>,
273    spawned_from_foreground: Vec<Vec<Backtrace>>,
274}
275
276impl Trace {
277    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
278        self.scheduled.push(
279            state
280                .scheduled_from_foreground
281                .iter()
282                .map(|(_, backtrace)| backtrace.clone())
283                .collect(),
284        );
285        self.spawned_from_foreground.push(
286            state
287                .spawned_from_foreground
288                .iter()
289                .map(|(_, backtrace)| backtrace.clone())
290                .collect(),
291        );
292        self.executed.push(executed);
293    }
294
295    fn resolve(&mut self) {
296        for backtrace in &mut self.executed {
297            backtrace.resolve();
298        }
299
300        for backtraces in &mut self.scheduled {
301            for backtrace in backtraces {
302                backtrace.resolve();
303            }
304        }
305
306        for backtraces in &mut self.spawned_from_foreground {
307            for backtrace in backtraces {
308                backtrace.resolve();
309            }
310        }
311    }
312}
313
314impl Debug for Trace {
315    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
317
318        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
319            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
320                let cwd = std::env::current_dir().unwrap();
321                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
322                    fmt::Display::fmt(&path, fmt)
323                };
324                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
325                for frame in self.0.frames() {
326                    let mut formatted_frame = fmt.frame();
327                    if frame
328                        .symbols()
329                        .iter()
330                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
331                    {
332                        formatted_frame.backtrace_frame(frame)?;
333                        break;
334                    }
335                }
336                fmt.finish()
337            }
338        }
339
340        for ((backtrace, scheduled), spawned_from_foreground) in self
341            .executed
342            .iter()
343            .zip(&self.scheduled)
344            .zip(&self.spawned_from_foreground)
345        {
346            writeln!(f, "Scheduled")?;
347            for backtrace in scheduled {
348                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
349            }
350            if scheduled.is_empty() {
351                writeln!(f, "None")?;
352            }
353            writeln!(f, "==========")?;
354
355            writeln!(f, "Spawned from foreground")?;
356            for backtrace in spawned_from_foreground {
357                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
358            }
359            if spawned_from_foreground.is_empty() {
360                writeln!(f, "None")?;
361            }
362            writeln!(f, "==========")?;
363
364            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
365            writeln!(f, "+++++++++++++++++++")?;
366        }
367
368        Ok(())
369    }
370}
371
372impl Drop for Trace {
373    fn drop(&mut self) {
374        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
375            trace_on_panic == "1" || trace_on_panic == "true"
376        } else {
377            false
378        };
379        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
380            trace_always == "1" || trace_always == "true"
381        } else {
382            false
383        };
384
385        if trace_always || (trace_on_panic && thread::panicking()) {
386            self.resolve();
387            dbg!(self);
388        }
389    }
390}
391
392impl Foreground {
393    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
394        if dispatcher.is_main_thread() {
395            Ok(Self::Platform {
396                dispatcher,
397                _not_send_or_sync: PhantomData,
398            })
399        } else {
400            Err(anyhow!("must be constructed on main thread"))
401        }
402    }
403
404    pub fn test() -> Self {
405        Self::Test(smol::LocalExecutor::new())
406    }
407
408    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
409        let future = any_local_future(future);
410        let any_task = match self {
411            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
412            Self::Platform { dispatcher, .. } => {
413                fn spawn_inner(
414                    future: AnyLocalFuture,
415                    dispatcher: &Arc<dyn Dispatcher>,
416                ) -> AnyLocalTask {
417                    let dispatcher = dispatcher.clone();
418                    let schedule =
419                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
420                    let (runnable, task) = async_task::spawn_local(future, schedule);
421                    runnable.schedule();
422                    task
423                }
424                spawn_inner(future, dispatcher)
425            }
426            Self::Test(executor) => executor.spawn(future),
427        };
428        Task::local(any_task)
429    }
430
431    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
432        let future = any_local_future(future);
433        let any_value = match self {
434            Self::Deterministic(executor) => executor.run(future),
435            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
436            Self::Test(executor) => smol::block_on(executor.run(future)),
437        };
438        *any_value.downcast().unwrap()
439    }
440
441    pub fn would_park(&self) -> bool {
442        match self {
443            Self::Deterministic(executor) => executor.state.lock().would_park(),
444            _ => panic!("this method can only be called on a deterministic executor"),
445        }
446    }
447
448    pub fn forbid_parking(&self) {
449        match self {
450            Self::Deterministic(executor) => {
451                let mut state = executor.state.lock();
452                state.forbid_parking = true;
453                state.rng = StdRng::seed_from_u64(state.seed);
454            }
455            _ => panic!("this method can only be called on a deterministic executor"),
456        }
457    }
458
459    pub async fn timer(&self, duration: Duration) {
460        match self {
461            Self::Deterministic(executor) => {
462                let (tx, mut rx) = barrier::channel();
463                {
464                    let mut state = executor.state.lock();
465                    let wakeup_at = state.now + duration;
466                    state.pending_timers.push((wakeup_at, tx));
467                }
468                rx.recv().await;
469            }
470            _ => {
471                Timer::after(duration).await;
472            }
473        }
474    }
475
476    pub fn advance_clock(&self, duration: Duration) {
477        match self {
478            Self::Deterministic(executor) => {
479                executor.run_until_parked();
480
481                let mut state = executor.state.lock();
482                state.now += duration;
483                let now = state.now;
484                let mut pending_timers = mem::take(&mut state.pending_timers);
485                drop(state);
486
487                pending_timers.retain(|(wakeup, _)| *wakeup > now);
488                executor.state.lock().pending_timers.extend(pending_timers);
489            }
490            _ => panic!("this method can only be called on a deterministic executor"),
491        }
492    }
493
494    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
495        match self {
496            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
497            _ => panic!("this method can only be called on a deterministic executor"),
498        }
499    }
500}
501
502impl Background {
503    pub fn new() -> Self {
504        let executor = Arc::new(Executor::new());
505        let stop = channel::unbounded::<()>();
506
507        for i in 0..2 * num_cpus::get() {
508            let executor = executor.clone();
509            let stop = stop.1.clone();
510            thread::Builder::new()
511                .name(format!("background-executor-{}", i))
512                .spawn(move || smol::block_on(executor.run(stop.recv())))
513                .unwrap();
514        }
515
516        Self::Production {
517            executor,
518            _stop: stop.0,
519        }
520    }
521
522    pub fn num_cpus(&self) -> usize {
523        num_cpus::get()
524    }
525
526    pub fn spawn<T, F>(&self, future: F) -> Task<T>
527    where
528        T: 'static + Send,
529        F: Send + Future<Output = T> + 'static,
530    {
531        let future = any_future(future);
532        let any_task = match self {
533            Self::Production { executor, .. } => executor.spawn(future),
534            Self::Deterministic { executor, .. } => executor.spawn(future),
535        };
536        Task::send(any_task)
537    }
538
539    pub fn block_with_timeout<F, T>(
540        &self,
541        timeout: Duration,
542        future: F,
543    ) -> Result<T, impl Future<Output = T>>
544    where
545        T: 'static,
546        F: 'static + Unpin + Future<Output = T>,
547    {
548        let mut future = any_local_future(future);
549        if !timeout.is_zero() {
550            let output = match self {
551                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
552                Self::Deterministic { executor, .. } => executor.block_on(&mut future),
553            };
554            if let Some(output) = output {
555                return Ok(*output.downcast().unwrap());
556            }
557        }
558        Err(async { *future.await.downcast().unwrap() })
559    }
560
561    pub async fn scoped<'scope, F>(&self, scheduler: F)
562    where
563        F: FnOnce(&mut Scope<'scope>),
564    {
565        let mut scope = Scope {
566            futures: Default::default(),
567            _phantom: PhantomData,
568        };
569        (scheduler)(&mut scope);
570        let spawned = scope
571            .futures
572            .into_iter()
573            .map(|f| self.spawn(f))
574            .collect::<Vec<_>>();
575        for task in spawned {
576            task.await;
577        }
578    }
579}
580
581pub struct Scope<'a> {
582    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
583    _phantom: PhantomData<&'a ()>,
584}
585
586impl<'a> Scope<'a> {
587    pub fn spawn<F>(&mut self, f: F)
588    where
589        F: Future<Output = ()> + Send + 'a,
590    {
591        let f = unsafe {
592            mem::transmute::<
593                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
594                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
595            >(Box::pin(f))
596        };
597        self.futures.push(f);
598    }
599}
600
601pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
602    let executor = Arc::new(Deterministic::new(seed));
603    (
604        Rc::new(Foreground::Deterministic(executor.clone())),
605        Arc::new(Background::Deterministic { executor }),
606    )
607}
608
609impl<T> Task<T> {
610    pub fn ready(value: T) -> Self {
611        Self::Ready(Some(value))
612    }
613
614    fn local(any_task: AnyLocalTask) -> Self {
615        Self::Local {
616            any_task,
617            result_type: PhantomData,
618        }
619    }
620
621    pub fn detach(self) {
622        match self {
623            Task::Ready(_) => {}
624            Task::Local { any_task, .. } => any_task.detach(),
625            Task::Send { any_task, .. } => any_task.detach(),
626        }
627    }
628}
629
630impl<T: Send> Task<T> {
631    fn send(any_task: AnyTask) -> Self {
632        Self::Send {
633            any_task,
634            result_type: PhantomData,
635        }
636    }
637}
638
639impl<T: fmt::Debug> fmt::Debug for Task<T> {
640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641        match self {
642            Task::Ready(value) => value.fmt(f),
643            Task::Local { any_task, .. } => any_task.fmt(f),
644            Task::Send { any_task, .. } => any_task.fmt(f),
645        }
646    }
647}
648
649impl<T: 'static> Future for Task<T> {
650    type Output = T;
651
652    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
653        match unsafe { self.get_unchecked_mut() } {
654            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
655            Task::Local { any_task, .. } => {
656                any_task.poll(cx).map(|value| *value.downcast().unwrap())
657            }
658            Task::Send { any_task, .. } => {
659                any_task.poll(cx).map(|value| *value.downcast().unwrap())
660            }
661        }
662    }
663}
664
665fn any_future<T, F>(future: F) -> AnyFuture
666where
667    T: 'static + Send,
668    F: Future<Output = T> + Send + 'static,
669{
670    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
671}
672
673fn any_local_future<T, F>(future: F) -> AnyLocalFuture
674where
675    T: 'static,
676    F: Future<Output = T> + 'static,
677{
678    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
679}