executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  4use parking_lot::Mutex;
  5use postage::{barrier, prelude::Stream as _};
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor, Timer};
  8use std::{
  9    any::Any,
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{
 27    platform::{self, Dispatcher},
 28    util,
 29};
 30
 31pub enum Foreground {
 32    Platform {
 33        dispatcher: Arc<dyn platform::Dispatcher>,
 34        _not_send_or_sync: PhantomData<Rc<()>>,
 35    },
 36    Test(smol::LocalExecutor<'static>),
 37    Deterministic(Arc<Deterministic>),
 38}
 39
 40pub enum Background {
 41    Deterministic {
 42        executor: Arc<Deterministic>,
 43    },
 44    Production {
 45        executor: Arc<smol::Executor<'static>>,
 46        _stop: channel::Sender<()>,
 47    },
 48}
 49
 50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 54
 55#[must_use]
 56pub enum Task<T> {
 57    Local {
 58        any_task: AnyLocalTask,
 59        result_type: PhantomData<T>,
 60    },
 61    Send {
 62        any_task: AnyTask,
 63        result_type: PhantomData<T>,
 64    },
 65}
 66
 67unsafe impl<T: Send> Send for Task<T> {}
 68
 69struct DeterministicState {
 70    rng: StdRng,
 71    seed: u64,
 72    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 73    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 74    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 75    forbid_parking: bool,
 76    block_on_ticks: RangeInclusive<usize>,
 77    now: Instant,
 78    pending_timers: Vec<(Instant, barrier::Sender)>,
 79}
 80
 81pub struct Deterministic {
 82    state: Arc<Mutex<DeterministicState>>,
 83    parker: Mutex<parking::Parker>,
 84}
 85
 86impl Deterministic {
 87    fn new(seed: u64) -> Self {
 88        Self {
 89            state: Arc::new(Mutex::new(DeterministicState {
 90                rng: StdRng::seed_from_u64(seed),
 91                seed,
 92                scheduled_from_foreground: Default::default(),
 93                scheduled_from_background: Default::default(),
 94                spawned_from_foreground: Default::default(),
 95                forbid_parking: false,
 96                block_on_ticks: 0..=1000,
 97                now: Instant::now(),
 98                pending_timers: Default::default(),
 99            })),
100            parker: Default::default(),
101        }
102    }
103
104    fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
105        let backtrace = Backtrace::new_unresolved();
106        let scheduled_once = AtomicBool::new(false);
107        let state = self.state.clone();
108        let unparker = self.parker.lock().unparker();
109        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
110            let mut state = state.lock();
111            let backtrace = backtrace.clone();
112            if scheduled_once.fetch_or(true, SeqCst) {
113                state.scheduled_from_foreground.push((runnable, backtrace));
114            } else {
115                state.spawned_from_foreground.push((runnable, backtrace));
116            }
117            unparker.unpark();
118        });
119        runnable.schedule();
120        task
121    }
122
123    fn spawn(&self, future: AnyFuture) -> AnyTask {
124        let backtrace = Backtrace::new_unresolved();
125        let state = self.state.clone();
126        let unparker = self.parker.lock().unparker();
127        let (runnable, task) = async_task::spawn(future, move |runnable| {
128            let mut state = state.lock();
129            state
130                .scheduled_from_background
131                .push((runnable, backtrace.clone()));
132            unparker.unpark();
133        });
134        runnable.schedule();
135        task
136    }
137
138    fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
139        let woken = Arc::new(AtomicBool::new(false));
140        loop {
141            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
142                return result;
143            }
144
145            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
146                panic!("deterministic executor parked after a call to forbid_parking");
147            }
148
149            woken.store(false, SeqCst);
150            self.parker.lock().park();
151        }
152    }
153
154    fn run_until_parked(&self) {
155        let woken = Arc::new(AtomicBool::new(false));
156        let mut future = any_local_future(std::future::pending::<()>());
157        self.run_internal(woken, &mut future);
158    }
159
160    fn run_internal(
161        &self,
162        woken: Arc<AtomicBool>,
163        future: &mut AnyLocalFuture,
164    ) -> Option<Box<dyn Any>> {
165        let unparker = self.parker.lock().unparker();
166        let waker = waker_fn(move || {
167            woken.store(true, SeqCst);
168            unparker.unpark();
169        });
170
171        let mut cx = Context::from_waker(&waker);
172        let mut trace = Trace::default();
173        loop {
174            let mut state = self.state.lock();
175            let runnable_count = state.scheduled_from_foreground.len()
176                + state.scheduled_from_background.len()
177                + state.spawned_from_foreground.len();
178
179            let ix = state.rng.gen_range(0..=runnable_count);
180            if ix < state.scheduled_from_foreground.len() {
181                let (_, backtrace) = &state.scheduled_from_foreground[ix];
182                trace.record(&state, backtrace.clone());
183                let runnable = state.scheduled_from_foreground.remove(ix).0;
184                drop(state);
185                runnable.run();
186            } else if ix - state.scheduled_from_foreground.len()
187                < state.scheduled_from_background.len()
188            {
189                let ix = ix - state.scheduled_from_foreground.len();
190                let (_, backtrace) = &state.scheduled_from_background[ix];
191                trace.record(&state, backtrace.clone());
192                let runnable = state.scheduled_from_background.remove(ix).0;
193                drop(state);
194                runnable.run();
195            } else if ix < runnable_count {
196                let (_, backtrace) = &state.spawned_from_foreground[0];
197                trace.record(&state, backtrace.clone());
198                let runnable = state.spawned_from_foreground.remove(0).0;
199                drop(state);
200                runnable.run();
201            } else {
202                drop(state);
203                if let Poll::Ready(result) = future.poll(&mut cx) {
204                    return Some(result);
205                }
206
207                let state = self.state.lock();
208                if state.scheduled_from_foreground.is_empty()
209                    && state.scheduled_from_background.is_empty()
210                    && state.spawned_from_foreground.is_empty()
211                {
212                    return None;
213                }
214            }
215        }
216    }
217
218    fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
219        let unparker = self.parker.lock().unparker();
220        let waker = waker_fn(move || {
221            unparker.unpark();
222        });
223        let max_ticks = {
224            let mut state = self.state.lock();
225            let range = state.block_on_ticks.clone();
226            state.rng.gen_range(range)
227        };
228
229        let mut cx = Context::from_waker(&waker);
230        let mut trace = Trace::default();
231        for _ in 0..max_ticks {
232            let mut state = self.state.lock();
233            let runnable_count = state.scheduled_from_background.len();
234            let ix = state.rng.gen_range(0..=runnable_count);
235            if ix < state.scheduled_from_background.len() {
236                let (_, backtrace) = &state.scheduled_from_background[ix];
237                trace.record(&state, backtrace.clone());
238                let runnable = state.scheduled_from_background.remove(ix).0;
239                drop(state);
240                runnable.run();
241            } else {
242                drop(state);
243                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
244                    return Some(result);
245                }
246                let state = self.state.lock();
247                if state.scheduled_from_background.is_empty() {
248                    if state.forbid_parking {
249                        panic!("deterministic executor parked after a call to forbid_parking");
250                    }
251                    drop(state);
252                    self.parker.lock().park();
253                }
254
255                continue;
256            }
257        }
258
259        None
260    }
261}
262
263#[derive(Default)]
264struct Trace {
265    executed: Vec<Backtrace>,
266    scheduled: Vec<Vec<Backtrace>>,
267    spawned_from_foreground: Vec<Vec<Backtrace>>,
268}
269
270impl Trace {
271    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
272        self.scheduled.push(
273            state
274                .scheduled_from_foreground
275                .iter()
276                .map(|(_, backtrace)| backtrace.clone())
277                .collect(),
278        );
279        self.spawned_from_foreground.push(
280            state
281                .spawned_from_foreground
282                .iter()
283                .map(|(_, backtrace)| backtrace.clone())
284                .collect(),
285        );
286        self.executed.push(executed);
287    }
288
289    fn resolve(&mut self) {
290        for backtrace in &mut self.executed {
291            backtrace.resolve();
292        }
293
294        for backtraces in &mut self.scheduled {
295            for backtrace in backtraces {
296                backtrace.resolve();
297            }
298        }
299
300        for backtraces in &mut self.spawned_from_foreground {
301            for backtrace in backtraces {
302                backtrace.resolve();
303            }
304        }
305    }
306}
307
308impl Debug for Trace {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
311
312        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
313            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
314                let cwd = std::env::current_dir().unwrap();
315                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
316                    fmt::Display::fmt(&path, fmt)
317                };
318                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
319                for frame in self.0.frames() {
320                    let mut formatted_frame = fmt.frame();
321                    if frame
322                        .symbols()
323                        .iter()
324                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
325                    {
326                        formatted_frame.backtrace_frame(frame)?;
327                        break;
328                    }
329                }
330                fmt.finish()
331            }
332        }
333
334        for ((backtrace, scheduled), spawned_from_foreground) in self
335            .executed
336            .iter()
337            .zip(&self.scheduled)
338            .zip(&self.spawned_from_foreground)
339        {
340            writeln!(f, "Scheduled")?;
341            for backtrace in scheduled {
342                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
343            }
344            if scheduled.is_empty() {
345                writeln!(f, "None")?;
346            }
347            writeln!(f, "==========")?;
348
349            writeln!(f, "Spawned from foreground")?;
350            for backtrace in spawned_from_foreground {
351                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
352            }
353            if spawned_from_foreground.is_empty() {
354                writeln!(f, "None")?;
355            }
356            writeln!(f, "==========")?;
357
358            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
359            writeln!(f, "+++++++++++++++++++")?;
360        }
361
362        Ok(())
363    }
364}
365
366impl Drop for Trace {
367    fn drop(&mut self) {
368        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
369            trace_on_panic == "1" || trace_on_panic == "true"
370        } else {
371            false
372        };
373        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
374            trace_always == "1" || trace_always == "true"
375        } else {
376            false
377        };
378
379        if trace_always || (trace_on_panic && thread::panicking()) {
380            self.resolve();
381            dbg!(self);
382        }
383    }
384}
385
386impl Foreground {
387    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
388        if dispatcher.is_main_thread() {
389            Ok(Self::Platform {
390                dispatcher,
391                _not_send_or_sync: PhantomData,
392            })
393        } else {
394            Err(anyhow!("must be constructed on main thread"))
395        }
396    }
397
398    pub fn test() -> Self {
399        Self::Test(smol::LocalExecutor::new())
400    }
401
402    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
403        let future = any_local_future(future);
404        let any_task = match self {
405            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
406            Self::Platform { dispatcher, .. } => {
407                fn spawn_inner(
408                    future: AnyLocalFuture,
409                    dispatcher: &Arc<dyn Dispatcher>,
410                ) -> AnyLocalTask {
411                    let dispatcher = dispatcher.clone();
412                    let schedule =
413                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
414                    let (runnable, task) = async_task::spawn_local(future, schedule);
415                    runnable.schedule();
416                    task
417                }
418                spawn_inner(future, dispatcher)
419            }
420            Self::Test(executor) => executor.spawn(future),
421        };
422        Task::local(any_task)
423    }
424
425    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
426        let future = any_local_future(future);
427        let any_value = match self {
428            Self::Deterministic(executor) => executor.run(future),
429            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
430            Self::Test(executor) => smol::block_on(executor.run(future)),
431        };
432        *any_value.downcast().unwrap()
433    }
434
435    pub fn forbid_parking(&self) {
436        match self {
437            Self::Deterministic(executor) => {
438                let mut state = executor.state.lock();
439                state.forbid_parking = true;
440                state.rng = StdRng::seed_from_u64(state.seed);
441            }
442            _ => panic!("this method can only be called on a deterministic executor"),
443        }
444    }
445
446    pub async fn timer(&self, duration: Duration) {
447        match self {
448            Self::Deterministic(executor) => {
449                let (tx, mut rx) = barrier::channel();
450                {
451                    let mut state = executor.state.lock();
452                    let wakeup_at = state.now + duration;
453                    state.pending_timers.push((wakeup_at, tx));
454                }
455                rx.recv().await;
456            }
457            _ => {
458                Timer::after(duration).await;
459            }
460        }
461    }
462
463    pub fn advance_clock(&self, duration: Duration) {
464        match self {
465            Self::Deterministic(executor) => {
466                executor.run_until_parked();
467
468                let mut state = executor.state.lock();
469                state.now += duration;
470                let now = state.now;
471                let mut pending_timers = mem::take(&mut state.pending_timers);
472                drop(state);
473
474                pending_timers.retain(|(wakeup, _)| *wakeup > now);
475                executor.state.lock().pending_timers.extend(pending_timers);
476            }
477            _ => panic!("this method can only be called on a deterministic executor"),
478        }
479    }
480
481    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
482        match self {
483            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
484            _ => panic!("this method can only be called on a deterministic executor"),
485        }
486    }
487}
488
489impl Background {
490    pub fn new() -> Self {
491        let executor = Arc::new(Executor::new());
492        let stop = channel::unbounded::<()>();
493
494        for i in 0..2 * num_cpus::get() {
495            let executor = executor.clone();
496            let stop = stop.1.clone();
497            thread::Builder::new()
498                .name(format!("background-executor-{}", i))
499                .spawn(move || smol::block_on(executor.run(stop.recv())))
500                .unwrap();
501        }
502
503        Self::Production {
504            executor,
505            _stop: stop.0,
506        }
507    }
508
509    pub fn num_cpus(&self) -> usize {
510        num_cpus::get()
511    }
512
513    pub fn spawn<T, F>(&self, future: F) -> Task<T>
514    where
515        T: 'static + Send,
516        F: Send + Future<Output = T> + 'static,
517    {
518        let future = any_future(future);
519        let any_task = match self {
520            Self::Production { executor, .. } => executor.spawn(future),
521            Self::Deterministic { executor, .. } => executor.spawn(future),
522        };
523        Task::send(any_task)
524    }
525
526    pub fn block_with_timeout<F, T>(
527        &self,
528        timeout: Duration,
529        future: F,
530    ) -> Result<T, impl Future<Output = T>>
531    where
532        T: 'static,
533        F: 'static + Unpin + Future<Output = T>,
534    {
535        let mut future = any_local_future(future);
536        if !timeout.is_zero() {
537            let output = match self {
538                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
539                Self::Deterministic { executor, .. } => executor.block_on(&mut future),
540            };
541            if let Some(output) = output {
542                return Ok(*output.downcast().unwrap());
543            }
544        }
545        Err(async { *future.await.downcast().unwrap() })
546    }
547
548    pub async fn scoped<'scope, F>(&self, scheduler: F)
549    where
550        F: FnOnce(&mut Scope<'scope>),
551    {
552        let mut scope = Scope {
553            futures: Default::default(),
554            _phantom: PhantomData,
555        };
556        (scheduler)(&mut scope);
557        let spawned = scope
558            .futures
559            .into_iter()
560            .map(|f| self.spawn(f))
561            .collect::<Vec<_>>();
562        for task in spawned {
563            task.await;
564        }
565    }
566}
567
568pub struct Scope<'a> {
569    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
570    _phantom: PhantomData<&'a ()>,
571}
572
573impl<'a> Scope<'a> {
574    pub fn spawn<F>(&mut self, f: F)
575    where
576        F: Future<Output = ()> + Send + 'a,
577    {
578        let f = unsafe {
579            mem::transmute::<
580                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
581                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
582            >(Box::pin(f))
583        };
584        self.futures.push(f);
585    }
586}
587
588pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
589    let executor = Arc::new(Deterministic::new(seed));
590    (
591        Rc::new(Foreground::Deterministic(executor.clone())),
592        Arc::new(Background::Deterministic { executor }),
593    )
594}
595
596impl<T> Task<T> {
597    fn local(any_task: AnyLocalTask) -> Self {
598        Self::Local {
599            any_task,
600            result_type: PhantomData,
601        }
602    }
603
604    pub fn detach(self) {
605        match self {
606            Task::Local { any_task, .. } => any_task.detach(),
607            Task::Send { any_task, .. } => any_task.detach(),
608        }
609    }
610}
611
612impl<T: Send> Task<T> {
613    fn send(any_task: AnyTask) -> Self {
614        Self::Send {
615            any_task,
616            result_type: PhantomData,
617        }
618    }
619}
620
621impl<T: fmt::Debug> fmt::Debug for Task<T> {
622    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
623        match self {
624            Task::Local { any_task, .. } => any_task.fmt(f),
625            Task::Send { any_task, .. } => any_task.fmt(f),
626        }
627    }
628}
629
630impl<T: 'static> Future for Task<T> {
631    type Output = T;
632
633    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
634        match unsafe { self.get_unchecked_mut() } {
635            Task::Local { any_task, .. } => {
636                any_task.poll(cx).map(|value| *value.downcast().unwrap())
637            }
638            Task::Send { any_task, .. } => {
639                any_task.poll(cx).map(|value| *value.downcast().unwrap())
640            }
641        }
642    }
643}
644
645fn any_future<T, F>(future: F) -> AnyFuture
646where
647    T: 'static + Send,
648    F: Future<Output = T> + Send + 'static,
649{
650    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
651}
652
653fn any_local_future<T, F>(future: F) -> AnyLocalFuture
654where
655    T: 'static,
656    F: Future<Output = T> + 'static,
657{
658    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
659}