executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  4use parking_lot::Mutex;
  5use postage::{barrier, prelude::Stream as _};
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor, Timer};
  8use std::{
  9    any::Any,
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{
 27    platform::{self, Dispatcher},
 28    util,
 29};
 30
 31pub enum Foreground {
 32    Platform {
 33        dispatcher: Arc<dyn platform::Dispatcher>,
 34        _not_send_or_sync: PhantomData<Rc<()>>,
 35    },
 36    Test(smol::LocalExecutor<'static>),
 37    Deterministic(Arc<Deterministic>),
 38}
 39
 40pub enum Background {
 41    Deterministic {
 42        executor: Arc<Deterministic>,
 43    },
 44    Production {
 45        executor: Arc<smol::Executor<'static>>,
 46        _stop: channel::Sender<()>,
 47    },
 48}
 49
 50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 54
 55#[must_use]
 56pub enum Task<T> {
 57    Ready(Option<T>),
 58    Local {
 59        any_task: AnyLocalTask,
 60        result_type: PhantomData<T>,
 61    },
 62    Send {
 63        any_task: AnyTask,
 64        result_type: PhantomData<T>,
 65    },
 66}
 67
 68unsafe impl<T: Send> Send for Task<T> {}
 69
 70struct DeterministicState {
 71    rng: StdRng,
 72    seed: u64,
 73    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 74    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 75    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 76    forbid_parking: bool,
 77    block_on_ticks: RangeInclusive<usize>,
 78    now: Instant,
 79    pending_timers: Vec<(Instant, barrier::Sender)>,
 80}
 81
 82pub struct Deterministic {
 83    state: Arc<Mutex<DeterministicState>>,
 84    parker: Mutex<parking::Parker>,
 85}
 86
 87impl Deterministic {
 88    fn new(seed: u64) -> Self {
 89        Self {
 90            state: Arc::new(Mutex::new(DeterministicState {
 91                rng: StdRng::seed_from_u64(seed),
 92                seed,
 93                scheduled_from_foreground: Default::default(),
 94                scheduled_from_background: Default::default(),
 95                spawned_from_foreground: Default::default(),
 96                forbid_parking: false,
 97                block_on_ticks: 0..=1000,
 98                now: Instant::now(),
 99                pending_timers: Default::default(),
100            })),
101            parker: Default::default(),
102        }
103    }
104
105    fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
106        let backtrace = Backtrace::new_unresolved();
107        let scheduled_once = AtomicBool::new(false);
108        let state = self.state.clone();
109        let unparker = self.parker.lock().unparker();
110        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
111            let mut state = state.lock();
112            let backtrace = backtrace.clone();
113            if scheduled_once.fetch_or(true, SeqCst) {
114                state.scheduled_from_foreground.push((runnable, backtrace));
115            } else {
116                state.spawned_from_foreground.push((runnable, backtrace));
117            }
118            unparker.unpark();
119        });
120        runnable.schedule();
121        task
122    }
123
124    fn spawn(&self, future: AnyFuture) -> AnyTask {
125        let backtrace = Backtrace::new_unresolved();
126        let state = self.state.clone();
127        let unparker = self.parker.lock().unparker();
128        let (runnable, task) = async_task::spawn(future, move |runnable| {
129            let mut state = state.lock();
130            state
131                .scheduled_from_background
132                .push((runnable, backtrace.clone()));
133            unparker.unpark();
134        });
135        runnable.schedule();
136        task
137    }
138
139    fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
140        let woken = Arc::new(AtomicBool::new(false));
141        loop {
142            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
143                return result;
144            }
145
146            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
147                panic!("deterministic executor parked after a call to forbid_parking");
148            }
149
150            woken.store(false, SeqCst);
151            self.parker.lock().park();
152        }
153    }
154
155    fn run_until_parked(&self) {
156        let woken = Arc::new(AtomicBool::new(false));
157        let mut future = any_local_future(std::future::pending::<()>());
158        self.run_internal(woken, &mut future);
159    }
160
161    fn run_internal(
162        &self,
163        woken: Arc<AtomicBool>,
164        future: &mut AnyLocalFuture,
165    ) -> Option<Box<dyn Any>> {
166        let unparker = self.parker.lock().unparker();
167        let waker = waker_fn(move || {
168            woken.store(true, SeqCst);
169            unparker.unpark();
170        });
171
172        let mut cx = Context::from_waker(&waker);
173        let mut trace = Trace::default();
174        loop {
175            let mut state = self.state.lock();
176            let runnable_count = state.scheduled_from_foreground.len()
177                + state.scheduled_from_background.len()
178                + state.spawned_from_foreground.len();
179
180            let ix = state.rng.gen_range(0..=runnable_count);
181            if ix < state.scheduled_from_foreground.len() {
182                let (_, backtrace) = &state.scheduled_from_foreground[ix];
183                trace.record(&state, backtrace.clone());
184                let runnable = state.scheduled_from_foreground.remove(ix).0;
185                drop(state);
186                runnable.run();
187            } else if ix - state.scheduled_from_foreground.len()
188                < state.scheduled_from_background.len()
189            {
190                let ix = ix - state.scheduled_from_foreground.len();
191                let (_, backtrace) = &state.scheduled_from_background[ix];
192                trace.record(&state, backtrace.clone());
193                let runnable = state.scheduled_from_background.remove(ix).0;
194                drop(state);
195                runnable.run();
196            } else if ix < runnable_count {
197                let (_, backtrace) = &state.spawned_from_foreground[0];
198                trace.record(&state, backtrace.clone());
199                let runnable = state.spawned_from_foreground.remove(0).0;
200                drop(state);
201                runnable.run();
202            } else {
203                drop(state);
204                if let Poll::Ready(result) = future.poll(&mut cx) {
205                    return Some(result);
206                }
207
208                let state = self.state.lock();
209                if state.scheduled_from_foreground.is_empty()
210                    && state.scheduled_from_background.is_empty()
211                    && state.spawned_from_foreground.is_empty()
212                {
213                    return None;
214                }
215            }
216        }
217    }
218
219    fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
220        let unparker = self.parker.lock().unparker();
221        let waker = waker_fn(move || {
222            unparker.unpark();
223        });
224        let max_ticks = {
225            let mut state = self.state.lock();
226            let range = state.block_on_ticks.clone();
227            state.rng.gen_range(range)
228        };
229
230        let mut cx = Context::from_waker(&waker);
231        let mut trace = Trace::default();
232        for _ in 0..max_ticks {
233            let mut state = self.state.lock();
234            let runnable_count = state.scheduled_from_background.len();
235            let ix = state.rng.gen_range(0..=runnable_count);
236            if ix < state.scheduled_from_background.len() {
237                let (_, backtrace) = &state.scheduled_from_background[ix];
238                trace.record(&state, backtrace.clone());
239                let runnable = state.scheduled_from_background.remove(ix).0;
240                drop(state);
241                runnable.run();
242            } else {
243                drop(state);
244                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
245                    return Some(result);
246                }
247                let state = self.state.lock();
248                if state.scheduled_from_background.is_empty() {
249                    if state.forbid_parking {
250                        panic!("deterministic executor parked after a call to forbid_parking");
251                    }
252                    drop(state);
253                    self.parker.lock().park();
254                }
255
256                continue;
257            }
258        }
259
260        None
261    }
262}
263
264#[derive(Default)]
265struct Trace {
266    executed: Vec<Backtrace>,
267    scheduled: Vec<Vec<Backtrace>>,
268    spawned_from_foreground: Vec<Vec<Backtrace>>,
269}
270
271impl Trace {
272    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
273        self.scheduled.push(
274            state
275                .scheduled_from_foreground
276                .iter()
277                .map(|(_, backtrace)| backtrace.clone())
278                .collect(),
279        );
280        self.spawned_from_foreground.push(
281            state
282                .spawned_from_foreground
283                .iter()
284                .map(|(_, backtrace)| backtrace.clone())
285                .collect(),
286        );
287        self.executed.push(executed);
288    }
289
290    fn resolve(&mut self) {
291        for backtrace in &mut self.executed {
292            backtrace.resolve();
293        }
294
295        for backtraces in &mut self.scheduled {
296            for backtrace in backtraces {
297                backtrace.resolve();
298            }
299        }
300
301        for backtraces in &mut self.spawned_from_foreground {
302            for backtrace in backtraces {
303                backtrace.resolve();
304            }
305        }
306    }
307}
308
309impl Debug for Trace {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
312
313        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
314            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
315                let cwd = std::env::current_dir().unwrap();
316                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
317                    fmt::Display::fmt(&path, fmt)
318                };
319                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
320                for frame in self.0.frames() {
321                    let mut formatted_frame = fmt.frame();
322                    if frame
323                        .symbols()
324                        .iter()
325                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
326                    {
327                        formatted_frame.backtrace_frame(frame)?;
328                        break;
329                    }
330                }
331                fmt.finish()
332            }
333        }
334
335        for ((backtrace, scheduled), spawned_from_foreground) in self
336            .executed
337            .iter()
338            .zip(&self.scheduled)
339            .zip(&self.spawned_from_foreground)
340        {
341            writeln!(f, "Scheduled")?;
342            for backtrace in scheduled {
343                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
344            }
345            if scheduled.is_empty() {
346                writeln!(f, "None")?;
347            }
348            writeln!(f, "==========")?;
349
350            writeln!(f, "Spawned from foreground")?;
351            for backtrace in spawned_from_foreground {
352                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
353            }
354            if spawned_from_foreground.is_empty() {
355                writeln!(f, "None")?;
356            }
357            writeln!(f, "==========")?;
358
359            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
360            writeln!(f, "+++++++++++++++++++")?;
361        }
362
363        Ok(())
364    }
365}
366
367impl Drop for Trace {
368    fn drop(&mut self) {
369        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
370            trace_on_panic == "1" || trace_on_panic == "true"
371        } else {
372            false
373        };
374        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
375            trace_always == "1" || trace_always == "true"
376        } else {
377            false
378        };
379
380        if trace_always || (trace_on_panic && thread::panicking()) {
381            self.resolve();
382            dbg!(self);
383        }
384    }
385}
386
387impl Foreground {
388    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
389        if dispatcher.is_main_thread() {
390            Ok(Self::Platform {
391                dispatcher,
392                _not_send_or_sync: PhantomData,
393            })
394        } else {
395            Err(anyhow!("must be constructed on main thread"))
396        }
397    }
398
399    pub fn test() -> Self {
400        Self::Test(smol::LocalExecutor::new())
401    }
402
403    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
404        let future = any_local_future(future);
405        let any_task = match self {
406            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
407            Self::Platform { dispatcher, .. } => {
408                fn spawn_inner(
409                    future: AnyLocalFuture,
410                    dispatcher: &Arc<dyn Dispatcher>,
411                ) -> AnyLocalTask {
412                    let dispatcher = dispatcher.clone();
413                    let schedule =
414                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
415                    let (runnable, task) = async_task::spawn_local(future, schedule);
416                    runnable.schedule();
417                    task
418                }
419                spawn_inner(future, dispatcher)
420            }
421            Self::Test(executor) => executor.spawn(future),
422        };
423        Task::local(any_task)
424    }
425
426    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
427        let future = any_local_future(future);
428        let any_value = match self {
429            Self::Deterministic(executor) => executor.run(future),
430            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
431            Self::Test(executor) => smol::block_on(executor.run(future)),
432        };
433        *any_value.downcast().unwrap()
434    }
435
436    pub fn forbid_parking(&self) {
437        match self {
438            Self::Deterministic(executor) => {
439                let mut state = executor.state.lock();
440                state.forbid_parking = true;
441                state.rng = StdRng::seed_from_u64(state.seed);
442            }
443            _ => panic!("this method can only be called on a deterministic executor"),
444        }
445    }
446
447    pub async fn timer(&self, duration: Duration) {
448        match self {
449            Self::Deterministic(executor) => {
450                let (tx, mut rx) = barrier::channel();
451                {
452                    let mut state = executor.state.lock();
453                    let wakeup_at = state.now + duration;
454                    state.pending_timers.push((wakeup_at, tx));
455                }
456                rx.recv().await;
457            }
458            _ => {
459                Timer::after(duration).await;
460            }
461        }
462    }
463
464    pub fn advance_clock(&self, duration: Duration) {
465        match self {
466            Self::Deterministic(executor) => {
467                executor.run_until_parked();
468
469                let mut state = executor.state.lock();
470                state.now += duration;
471                let now = state.now;
472                let mut pending_timers = mem::take(&mut state.pending_timers);
473                drop(state);
474
475                pending_timers.retain(|(wakeup, _)| *wakeup > now);
476                executor.state.lock().pending_timers.extend(pending_timers);
477            }
478            _ => panic!("this method can only be called on a deterministic executor"),
479        }
480    }
481
482    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
483        match self {
484            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
485            _ => panic!("this method can only be called on a deterministic executor"),
486        }
487    }
488}
489
490impl Background {
491    pub fn new() -> Self {
492        let executor = Arc::new(Executor::new());
493        let stop = channel::unbounded::<()>();
494
495        for i in 0..2 * num_cpus::get() {
496            let executor = executor.clone();
497            let stop = stop.1.clone();
498            thread::Builder::new()
499                .name(format!("background-executor-{}", i))
500                .spawn(move || smol::block_on(executor.run(stop.recv())))
501                .unwrap();
502        }
503
504        Self::Production {
505            executor,
506            _stop: stop.0,
507        }
508    }
509
510    pub fn num_cpus(&self) -> usize {
511        num_cpus::get()
512    }
513
514    pub fn spawn<T, F>(&self, future: F) -> Task<T>
515    where
516        T: 'static + Send,
517        F: Send + Future<Output = T> + 'static,
518    {
519        let future = any_future(future);
520        let any_task = match self {
521            Self::Production { executor, .. } => executor.spawn(future),
522            Self::Deterministic { executor, .. } => executor.spawn(future),
523        };
524        Task::send(any_task)
525    }
526
527    pub fn block_with_timeout<F, T>(
528        &self,
529        timeout: Duration,
530        future: F,
531    ) -> Result<T, impl Future<Output = T>>
532    where
533        T: 'static,
534        F: 'static + Unpin + Future<Output = T>,
535    {
536        let mut future = any_local_future(future);
537        if !timeout.is_zero() {
538            let output = match self {
539                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
540                Self::Deterministic { executor, .. } => executor.block_on(&mut future),
541            };
542            if let Some(output) = output {
543                return Ok(*output.downcast().unwrap());
544            }
545        }
546        Err(async { *future.await.downcast().unwrap() })
547    }
548
549    pub async fn scoped<'scope, F>(&self, scheduler: F)
550    where
551        F: FnOnce(&mut Scope<'scope>),
552    {
553        let mut scope = Scope {
554            futures: Default::default(),
555            _phantom: PhantomData,
556        };
557        (scheduler)(&mut scope);
558        let spawned = scope
559            .futures
560            .into_iter()
561            .map(|f| self.spawn(f))
562            .collect::<Vec<_>>();
563        for task in spawned {
564            task.await;
565        }
566    }
567}
568
569pub struct Scope<'a> {
570    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
571    _phantom: PhantomData<&'a ()>,
572}
573
574impl<'a> Scope<'a> {
575    pub fn spawn<F>(&mut self, f: F)
576    where
577        F: Future<Output = ()> + Send + 'a,
578    {
579        let f = unsafe {
580            mem::transmute::<
581                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
582                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
583            >(Box::pin(f))
584        };
585        self.futures.push(f);
586    }
587}
588
589pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
590    let executor = Arc::new(Deterministic::new(seed));
591    (
592        Rc::new(Foreground::Deterministic(executor.clone())),
593        Arc::new(Background::Deterministic { executor }),
594    )
595}
596
597impl<T> Task<T> {
598    pub fn ready(value: T) -> Self {
599        Self::Ready(Some(value))
600    }
601
602    fn local(any_task: AnyLocalTask) -> Self {
603        Self::Local {
604            any_task,
605            result_type: PhantomData,
606        }
607    }
608
609    pub fn detach(self) {
610        match self {
611            Task::Ready(_) => {}
612            Task::Local { any_task, .. } => any_task.detach(),
613            Task::Send { any_task, .. } => any_task.detach(),
614        }
615    }
616}
617
618impl<T: Send> Task<T> {
619    fn send(any_task: AnyTask) -> Self {
620        Self::Send {
621            any_task,
622            result_type: PhantomData,
623        }
624    }
625}
626
627impl<T: fmt::Debug> fmt::Debug for Task<T> {
628    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629        match self {
630            Task::Ready(value) => value.fmt(f),
631            Task::Local { any_task, .. } => any_task.fmt(f),
632            Task::Send { any_task, .. } => any_task.fmt(f),
633        }
634    }
635}
636
637impl<T: 'static> Future for Task<T> {
638    type Output = T;
639
640    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
641        match unsafe { self.get_unchecked_mut() } {
642            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
643            Task::Local { any_task, .. } => {
644                any_task.poll(cx).map(|value| *value.downcast().unwrap())
645            }
646            Task::Send { any_task, .. } => {
647                any_task.poll(cx).map(|value| *value.downcast().unwrap())
648            }
649        }
650    }
651}
652
653fn any_future<T, F>(future: F) -> AnyFuture
654where
655    T: 'static + Send,
656    F: Future<Output = T> + Send + 'static,
657{
658    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
659}
660
661fn any_local_future<T, F>(future: F) -> AnyLocalFuture
662where
663    T: 'static,
664    F: Future<Output = T> + 'static,
665{
666    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
667}