executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  4use parking_lot::Mutex;
  5use postage::{barrier, prelude::Stream as _};
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor, Timer};
  8use std::{
  9    any::Any,
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{
 27    platform::{self, Dispatcher},
 28    util,
 29};
 30
 31pub enum Foreground {
 32    Platform {
 33        dispatcher: Arc<dyn platform::Dispatcher>,
 34        _not_send_or_sync: PhantomData<Rc<()>>,
 35    },
 36    Test(smol::LocalExecutor<'static>),
 37    Deterministic(Arc<Deterministic>),
 38}
 39
 40pub enum Background {
 41    Deterministic {
 42        executor: Arc<Deterministic>,
 43    },
 44    Production {
 45        executor: Arc<smol::Executor<'static>>,
 46        _stop: channel::Sender<()>,
 47    },
 48}
 49
 50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 54
 55#[must_use]
 56pub enum Task<T> {
 57    Ready(Option<T>),
 58    Local {
 59        any_task: AnyLocalTask,
 60        result_type: PhantomData<T>,
 61    },
 62    Send {
 63        any_task: AnyTask,
 64        result_type: PhantomData<T>,
 65    },
 66}
 67
 68unsafe impl<T: Send> Send for Task<T> {}
 69
 70struct DeterministicState {
 71    rng: StdRng,
 72    seed: u64,
 73    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 74    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 75    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 76    forbid_parking: bool,
 77    block_on_ticks: RangeInclusive<usize>,
 78    now: Instant,
 79    pending_timers: Vec<(Instant, barrier::Sender)>,
 80}
 81
 82pub struct Deterministic {
 83    state: Arc<Mutex<DeterministicState>>,
 84    parker: Mutex<parking::Parker>,
 85}
 86
 87impl Deterministic {
 88    fn new(seed: u64) -> Self {
 89        Self {
 90            state: Arc::new(Mutex::new(DeterministicState {
 91                rng: StdRng::seed_from_u64(seed),
 92                seed,
 93                scheduled_from_foreground: Default::default(),
 94                scheduled_from_background: Default::default(),
 95                spawned_from_foreground: Default::default(),
 96                forbid_parking: false,
 97                block_on_ticks: 0..=1000,
 98                now: Instant::now(),
 99                pending_timers: Default::default(),
100            })),
101            parker: Default::default(),
102        }
103    }
104
105    fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
106        let backtrace = Backtrace::new_unresolved();
107        let scheduled_once = AtomicBool::new(false);
108        let state = self.state.clone();
109        let unparker = self.parker.lock().unparker();
110        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
111            let mut state = state.lock();
112            let backtrace = backtrace.clone();
113            if scheduled_once.fetch_or(true, SeqCst) {
114                state.scheduled_from_foreground.push((runnable, backtrace));
115            } else {
116                state.spawned_from_foreground.push((runnable, backtrace));
117            }
118            unparker.unpark();
119        });
120        runnable.schedule();
121        task
122    }
123
124    fn spawn(&self, future: AnyFuture) -> AnyTask {
125        let backtrace = Backtrace::new_unresolved();
126        let state = self.state.clone();
127        let unparker = self.parker.lock().unparker();
128        let (runnable, task) = async_task::spawn(future, move |runnable| {
129            let mut state = state.lock();
130            state
131                .scheduled_from_background
132                .push((runnable, backtrace.clone()));
133            unparker.unpark();
134        });
135        runnable.schedule();
136        task
137    }
138
139    fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
140        let woken = Arc::new(AtomicBool::new(false));
141        loop {
142            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
143                return result;
144            }
145
146            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
147                panic!("deterministic executor parked after a call to forbid_parking");
148            }
149
150            woken.store(false, SeqCst);
151            self.parker.lock().park();
152        }
153    }
154
155    fn run_until_parked(&self) {
156        let woken = Arc::new(AtomicBool::new(false));
157        let mut future = any_local_future(std::future::pending::<()>());
158        self.run_internal(woken, &mut future);
159    }
160
161    fn run_internal(
162        &self,
163        woken: Arc<AtomicBool>,
164        future: &mut AnyLocalFuture,
165    ) -> Option<Box<dyn Any>> {
166        let unparker = self.parker.lock().unparker();
167        let waker = waker_fn(move || {
168            woken.store(true, SeqCst);
169            unparker.unpark();
170        });
171
172        let mut cx = Context::from_waker(&waker);
173        let mut trace = Trace::default();
174        loop {
175            let mut state = self.state.lock();
176            let runnable_count = state.scheduled_from_foreground.len()
177                + state.scheduled_from_background.len()
178                + state.spawned_from_foreground.len();
179
180            let ix = state.rng.gen_range(0..=runnable_count);
181            if ix < state.scheduled_from_foreground.len() {
182                let (_, backtrace) = &state.scheduled_from_foreground[ix];
183                trace.record(&state, backtrace.clone());
184                let runnable = state.scheduled_from_foreground.remove(ix).0;
185                drop(state);
186                runnable.run();
187            } else if ix - state.scheduled_from_foreground.len()
188                < state.scheduled_from_background.len()
189            {
190                let ix = ix - state.scheduled_from_foreground.len();
191                let (_, backtrace) = &state.scheduled_from_background[ix];
192                trace.record(&state, backtrace.clone());
193                let runnable = state.scheduled_from_background.remove(ix).0;
194                drop(state);
195                runnable.run();
196            } else if ix < runnable_count {
197                let (_, backtrace) = &state.spawned_from_foreground[0];
198                trace.record(&state, backtrace.clone());
199                let runnable = state.spawned_from_foreground.remove(0).0;
200                drop(state);
201                runnable.run();
202            } else {
203                drop(state);
204                if let Poll::Ready(result) = future.poll(&mut cx) {
205                    return Some(result);
206                }
207
208                let state = self.state.lock();
209                if state.would_park() {
210                    return None;
211                }
212            }
213        }
214    }
215
216    fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
217        let unparker = self.parker.lock().unparker();
218        let waker = waker_fn(move || {
219            unparker.unpark();
220        });
221        let max_ticks = {
222            let mut state = self.state.lock();
223            let range = state.block_on_ticks.clone();
224            state.rng.gen_range(range)
225        };
226
227        let mut cx = Context::from_waker(&waker);
228        let mut trace = Trace::default();
229        for _ in 0..max_ticks {
230            let mut state = self.state.lock();
231            let runnable_count = state.scheduled_from_background.len();
232            let ix = state.rng.gen_range(0..=runnable_count);
233            if ix < state.scheduled_from_background.len() {
234                let (_, backtrace) = &state.scheduled_from_background[ix];
235                trace.record(&state, backtrace.clone());
236                let runnable = state.scheduled_from_background.remove(ix).0;
237                drop(state);
238                runnable.run();
239            } else {
240                drop(state);
241                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
242                    return Some(result);
243                }
244                let state = self.state.lock();
245                if state.scheduled_from_background.is_empty() {
246                    if state.forbid_parking {
247                        panic!("deterministic executor parked after a call to forbid_parking");
248                    }
249                    drop(state);
250                    self.parker.lock().park();
251                }
252
253                continue;
254            }
255        }
256
257        None
258    }
259}
260
261impl DeterministicState {
262    fn would_park(&self) -> bool {
263        self.forbid_parking
264            && self.scheduled_from_foreground.is_empty()
265            && self.scheduled_from_background.is_empty()
266            && self.spawned_from_foreground.is_empty()
267    }
268}
269
270#[derive(Default)]
271struct Trace {
272    executed: Vec<Backtrace>,
273    scheduled: Vec<Vec<Backtrace>>,
274    spawned_from_foreground: Vec<Vec<Backtrace>>,
275}
276
277impl Trace {
278    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
279        self.scheduled.push(
280            state
281                .scheduled_from_foreground
282                .iter()
283                .map(|(_, backtrace)| backtrace.clone())
284                .collect(),
285        );
286        self.spawned_from_foreground.push(
287            state
288                .spawned_from_foreground
289                .iter()
290                .map(|(_, backtrace)| backtrace.clone())
291                .collect(),
292        );
293        self.executed.push(executed);
294    }
295
296    fn resolve(&mut self) {
297        for backtrace in &mut self.executed {
298            backtrace.resolve();
299        }
300
301        for backtraces in &mut self.scheduled {
302            for backtrace in backtraces {
303                backtrace.resolve();
304            }
305        }
306
307        for backtraces in &mut self.spawned_from_foreground {
308            for backtrace in backtraces {
309                backtrace.resolve();
310            }
311        }
312    }
313}
314
315impl Debug for Trace {
316    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
318
319        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
320            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
321                let cwd = std::env::current_dir().unwrap();
322                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
323                    fmt::Display::fmt(&path, fmt)
324                };
325                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
326                for frame in self.0.frames() {
327                    let mut formatted_frame = fmt.frame();
328                    if frame
329                        .symbols()
330                        .iter()
331                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
332                    {
333                        formatted_frame.backtrace_frame(frame)?;
334                        break;
335                    }
336                }
337                fmt.finish()
338            }
339        }
340
341        for ((backtrace, scheduled), spawned_from_foreground) in self
342            .executed
343            .iter()
344            .zip(&self.scheduled)
345            .zip(&self.spawned_from_foreground)
346        {
347            writeln!(f, "Scheduled")?;
348            for backtrace in scheduled {
349                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
350            }
351            if scheduled.is_empty() {
352                writeln!(f, "None")?;
353            }
354            writeln!(f, "==========")?;
355
356            writeln!(f, "Spawned from foreground")?;
357            for backtrace in spawned_from_foreground {
358                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
359            }
360            if spawned_from_foreground.is_empty() {
361                writeln!(f, "None")?;
362            }
363            writeln!(f, "==========")?;
364
365            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
366            writeln!(f, "+++++++++++++++++++")?;
367        }
368
369        Ok(())
370    }
371}
372
373impl Drop for Trace {
374    fn drop(&mut self) {
375        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
376            trace_on_panic == "1" || trace_on_panic == "true"
377        } else {
378            false
379        };
380        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
381            trace_always == "1" || trace_always == "true"
382        } else {
383            false
384        };
385
386        if trace_always || (trace_on_panic && thread::panicking()) {
387            self.resolve();
388            dbg!(self);
389        }
390    }
391}
392
393impl Foreground {
394    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
395        if dispatcher.is_main_thread() {
396            Ok(Self::Platform {
397                dispatcher,
398                _not_send_or_sync: PhantomData,
399            })
400        } else {
401            Err(anyhow!("must be constructed on main thread"))
402        }
403    }
404
405    pub fn test() -> Self {
406        Self::Test(smol::LocalExecutor::new())
407    }
408
409    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
410        let future = any_local_future(future);
411        let any_task = match self {
412            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
413            Self::Platform { dispatcher, .. } => {
414                fn spawn_inner(
415                    future: AnyLocalFuture,
416                    dispatcher: &Arc<dyn Dispatcher>,
417                ) -> AnyLocalTask {
418                    let dispatcher = dispatcher.clone();
419                    let schedule =
420                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
421                    let (runnable, task) = async_task::spawn_local(future, schedule);
422                    runnable.schedule();
423                    task
424                }
425                spawn_inner(future, dispatcher)
426            }
427            Self::Test(executor) => executor.spawn(future),
428        };
429        Task::local(any_task)
430    }
431
432    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
433        let future = any_local_future(future);
434        let any_value = match self {
435            Self::Deterministic(executor) => executor.run(future),
436            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
437            Self::Test(executor) => smol::block_on(executor.run(future)),
438        };
439        *any_value.downcast().unwrap()
440    }
441
442    pub fn parking_forbidden(&self) -> bool {
443        match self {
444            Self::Deterministic(executor) => executor.state.lock().forbid_parking,
445            _ => panic!("this method can only be called on a deterministic executor"),
446        }
447    }
448
449    pub fn would_park(&self) -> bool {
450        match self {
451            Self::Deterministic(executor) => executor.state.lock().would_park(),
452            _ => panic!("this method can only be called on a deterministic executor"),
453        }
454    }
455
456    pub fn forbid_parking(&self) {
457        match self {
458            Self::Deterministic(executor) => {
459                let mut state = executor.state.lock();
460                state.forbid_parking = true;
461                state.rng = StdRng::seed_from_u64(state.seed);
462            }
463            _ => panic!("this method can only be called on a deterministic executor"),
464        }
465    }
466
467    pub async fn timer(&self, duration: Duration) {
468        match self {
469            Self::Deterministic(executor) => {
470                let (tx, mut rx) = barrier::channel();
471                {
472                    let mut state = executor.state.lock();
473                    let wakeup_at = state.now + duration;
474                    state.pending_timers.push((wakeup_at, tx));
475                }
476                rx.recv().await;
477            }
478            _ => {
479                Timer::after(duration).await;
480            }
481        }
482    }
483
484    pub fn advance_clock(&self, duration: Duration) {
485        match self {
486            Self::Deterministic(executor) => {
487                executor.run_until_parked();
488
489                let mut state = executor.state.lock();
490                state.now += duration;
491                let now = state.now;
492                let mut pending_timers = mem::take(&mut state.pending_timers);
493                drop(state);
494
495                pending_timers.retain(|(wakeup, _)| *wakeup > now);
496                executor.state.lock().pending_timers.extend(pending_timers);
497            }
498            _ => panic!("this method can only be called on a deterministic executor"),
499        }
500    }
501
502    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
503        match self {
504            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
505            _ => panic!("this method can only be called on a deterministic executor"),
506        }
507    }
508}
509
510impl Background {
511    pub fn new() -> Self {
512        let executor = Arc::new(Executor::new());
513        let stop = channel::unbounded::<()>();
514
515        for i in 0..2 * num_cpus::get() {
516            let executor = executor.clone();
517            let stop = stop.1.clone();
518            thread::Builder::new()
519                .name(format!("background-executor-{}", i))
520                .spawn(move || smol::block_on(executor.run(stop.recv())))
521                .unwrap();
522        }
523
524        Self::Production {
525            executor,
526            _stop: stop.0,
527        }
528    }
529
530    pub fn num_cpus(&self) -> usize {
531        num_cpus::get()
532    }
533
534    pub fn spawn<T, F>(&self, future: F) -> Task<T>
535    where
536        T: 'static + Send,
537        F: Send + Future<Output = T> + 'static,
538    {
539        let future = any_future(future);
540        let any_task = match self {
541            Self::Production { executor, .. } => executor.spawn(future),
542            Self::Deterministic { executor, .. } => executor.spawn(future),
543        };
544        Task::send(any_task)
545    }
546
547    pub fn block_with_timeout<F, T>(
548        &self,
549        timeout: Duration,
550        future: F,
551    ) -> Result<T, impl Future<Output = T>>
552    where
553        T: 'static,
554        F: 'static + Unpin + Future<Output = T>,
555    {
556        let mut future = any_local_future(future);
557        if !timeout.is_zero() {
558            let output = match self {
559                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
560                Self::Deterministic { executor, .. } => executor.block_on(&mut future),
561            };
562            if let Some(output) = output {
563                return Ok(*output.downcast().unwrap());
564            }
565        }
566        Err(async { *future.await.downcast().unwrap() })
567    }
568
569    pub async fn scoped<'scope, F>(&self, scheduler: F)
570    where
571        F: FnOnce(&mut Scope<'scope>),
572    {
573        let mut scope = Scope {
574            futures: Default::default(),
575            _phantom: PhantomData,
576        };
577        (scheduler)(&mut scope);
578        let spawned = scope
579            .futures
580            .into_iter()
581            .map(|f| self.spawn(f))
582            .collect::<Vec<_>>();
583        for task in spawned {
584            task.await;
585        }
586    }
587}
588
589pub struct Scope<'a> {
590    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
591    _phantom: PhantomData<&'a ()>,
592}
593
594impl<'a> Scope<'a> {
595    pub fn spawn<F>(&mut self, f: F)
596    where
597        F: Future<Output = ()> + Send + 'a,
598    {
599        let f = unsafe {
600            mem::transmute::<
601                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
602                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
603            >(Box::pin(f))
604        };
605        self.futures.push(f);
606    }
607}
608
609pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
610    let executor = Arc::new(Deterministic::new(seed));
611    (
612        Rc::new(Foreground::Deterministic(executor.clone())),
613        Arc::new(Background::Deterministic { executor }),
614    )
615}
616
617impl<T> Task<T> {
618    pub fn ready(value: T) -> Self {
619        Self::Ready(Some(value))
620    }
621
622    fn local(any_task: AnyLocalTask) -> Self {
623        Self::Local {
624            any_task,
625            result_type: PhantomData,
626        }
627    }
628
629    pub fn detach(self) {
630        match self {
631            Task::Ready(_) => {}
632            Task::Local { any_task, .. } => any_task.detach(),
633            Task::Send { any_task, .. } => any_task.detach(),
634        }
635    }
636}
637
638impl<T: Send> Task<T> {
639    fn send(any_task: AnyTask) -> Self {
640        Self::Send {
641            any_task,
642            result_type: PhantomData,
643        }
644    }
645}
646
647impl<T: fmt::Debug> fmt::Debug for Task<T> {
648    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
649        match self {
650            Task::Ready(value) => value.fmt(f),
651            Task::Local { any_task, .. } => any_task.fmt(f),
652            Task::Send { any_task, .. } => any_task.fmt(f),
653        }
654    }
655}
656
657impl<T: 'static> Future for Task<T> {
658    type Output = T;
659
660    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
661        match unsafe { self.get_unchecked_mut() } {
662            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
663            Task::Local { any_task, .. } => {
664                any_task.poll(cx).map(|value| *value.downcast().unwrap())
665            }
666            Task::Send { any_task, .. } => {
667                any_task.poll(cx).map(|value| *value.downcast().unwrap())
668            }
669        }
670    }
671}
672
673fn any_future<T, F>(future: F) -> AnyFuture
674where
675    T: 'static + Send,
676    F: Future<Output = T> + Send + 'static,
677{
678    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
679}
680
681fn any_local_future<T, F>(future: F) -> AnyLocalFuture
682where
683    T: 'static,
684    F: Future<Output = T> + 'static,
685{
686    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
687}