executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use postage::{barrier, prelude::Stream as _};
  7use rand::prelude::*;
  8use smol::{channel, prelude::*, Executor, Timer};
  9use std::{
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{platform, util};
 27
 28pub enum Foreground {
 29    Platform {
 30        dispatcher: Arc<dyn platform::Dispatcher>,
 31        _not_send_or_sync: PhantomData<Rc<()>>,
 32    },
 33    Test(smol::LocalExecutor<'static>),
 34    Deterministic(Arc<Deterministic>),
 35}
 36
 37pub enum Background {
 38    Deterministic(Arc<Deterministic>),
 39    Production {
 40        executor: Arc<smol::Executor<'static>>,
 41        _stop: channel::Sender<()>,
 42    },
 43}
 44
 45struct DeterministicState {
 46    rng: StdRng,
 47    seed: u64,
 48    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 49    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 50    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 51    forbid_parking: bool,
 52    block_on_ticks: RangeInclusive<usize>,
 53    now: Instant,
 54    pending_timers: Vec<(Instant, barrier::Sender)>,
 55}
 56
 57pub struct Deterministic {
 58    state: Arc<Mutex<DeterministicState>>,
 59    parker: Mutex<parking::Parker>,
 60}
 61
 62impl Deterministic {
 63    fn new(seed: u64) -> Self {
 64        Self {
 65            state: Arc::new(Mutex::new(DeterministicState {
 66                rng: StdRng::seed_from_u64(seed),
 67                seed,
 68                scheduled_from_foreground: Default::default(),
 69                scheduled_from_background: Default::default(),
 70                spawned_from_foreground: Default::default(),
 71                forbid_parking: false,
 72                block_on_ticks: 0..=1000,
 73                now: Instant::now(),
 74                pending_timers: Default::default(),
 75            })),
 76            parker: Default::default(),
 77        }
 78    }
 79
 80    fn spawn_from_foreground<T>(&self, future: Pin<Box<dyn Future<Output = T>>>) -> Task<T>
 81    where
 82        T: 'static,
 83    {
 84        let backtrace = Backtrace::new_unresolved();
 85        let scheduled_once = AtomicBool::new(false);
 86        let state = self.state.clone();
 87        let unparker = self.parker.lock().unparker();
 88        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 89            let mut state = state.lock();
 90            let backtrace = backtrace.clone();
 91            if scheduled_once.fetch_or(true, SeqCst) {
 92                state.scheduled_from_foreground.push((runnable, backtrace));
 93            } else {
 94                state.spawned_from_foreground.push((runnable, backtrace));
 95            }
 96            unparker.unpark();
 97        });
 98        runnable.schedule();
 99        task
100    }
101
102    fn spawn<T>(&self, future: Pin<Box<dyn Send + Future<Output = T>>>) -> Task<T>
103    where
104        T: 'static + Send,
105    {
106        let backtrace = Backtrace::new_unresolved();
107        let state = self.state.clone();
108        let unparker = self.parker.lock().unparker();
109        let (runnable, task) = async_task::spawn(future, move |runnable| {
110            let mut state = state.lock();
111            state
112                .scheduled_from_background
113                .push((runnable, backtrace.clone()));
114            unparker.unpark();
115        });
116        runnable.schedule();
117        task
118    }
119
120    fn run<T>(&self, mut future: Pin<Box<dyn Future<Output = T>>>) -> T
121    where
122        T: 'static,
123    {
124        let woken = Arc::new(AtomicBool::new(false));
125        loop {
126            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
127                return result;
128            }
129
130            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
131                panic!("deterministic executor parked after a call to forbid_parking");
132            }
133
134            woken.store(false, SeqCst);
135            self.parker.lock().park();
136        }
137    }
138
139    fn run_until_parked(&self) {
140        let woken = Arc::new(AtomicBool::new(false));
141        let mut future = std::future::pending::<()>().boxed_local();
142        self.run_internal(woken, &mut future);
143    }
144
145    fn run_internal<T>(
146        &self,
147        woken: Arc<AtomicBool>,
148        future: &mut Pin<Box<dyn Future<Output = T>>>,
149    ) -> Option<T>
150    where
151        T: 'static,
152    {
153        let unparker = self.parker.lock().unparker();
154        let waker = waker_fn(move || {
155            woken.store(true, SeqCst);
156            unparker.unpark();
157        });
158
159        let mut cx = Context::from_waker(&waker);
160        let mut trace = Trace::default();
161        loop {
162            let mut state = self.state.lock();
163            let runnable_count = state.scheduled_from_foreground.len()
164                + state.scheduled_from_background.len()
165                + state.spawned_from_foreground.len();
166
167            let ix = state.rng.gen_range(0..=runnable_count);
168            if ix < state.scheduled_from_foreground.len() {
169                let (_, backtrace) = &state.scheduled_from_foreground[ix];
170                trace.record(&state, backtrace.clone());
171                let runnable = state.scheduled_from_foreground.remove(ix).0;
172                drop(state);
173                runnable.run();
174            } else if ix - state.scheduled_from_foreground.len()
175                < state.scheduled_from_background.len()
176            {
177                let ix = ix - state.scheduled_from_foreground.len();
178                let (_, backtrace) = &state.scheduled_from_background[ix];
179                trace.record(&state, backtrace.clone());
180                let runnable = state.scheduled_from_background.remove(ix).0;
181                drop(state);
182                runnable.run();
183            } else if ix < runnable_count {
184                let (_, backtrace) = &state.spawned_from_foreground[0];
185                trace.record(&state, backtrace.clone());
186                let runnable = state.spawned_from_foreground.remove(0).0;
187                drop(state);
188                runnable.run();
189            } else {
190                drop(state);
191                if let Poll::Ready(result) = future.poll(&mut cx) {
192                    return Some(result);
193                }
194
195                let state = self.state.lock();
196                if state.scheduled_from_foreground.is_empty()
197                    && state.scheduled_from_background.is_empty()
198                    && state.spawned_from_foreground.is_empty()
199                {
200                    return None;
201                }
202            }
203        }
204    }
205
206    pub fn block_on<F, T>(&self, future: F) -> Option<T>
207    where
208        T: 'static,
209        F: Future<Output = T>,
210    {
211        smol::pin!(future);
212
213        let unparker = self.parker.lock().unparker();
214        let waker = waker_fn(move || {
215            unparker.unpark();
216        });
217        let max_ticks = {
218            let mut state = self.state.lock();
219            let range = state.block_on_ticks.clone();
220            state.rng.gen_range(range)
221        };
222
223        let mut cx = Context::from_waker(&waker);
224        let mut trace = Trace::default();
225        for _ in 0..max_ticks {
226            let mut state = self.state.lock();
227            let runnable_count = state.scheduled_from_background.len();
228            let ix = state.rng.gen_range(0..=runnable_count);
229            if ix < state.scheduled_from_background.len() {
230                let (_, backtrace) = &state.scheduled_from_background[ix];
231                trace.record(&state, backtrace.clone());
232                let runnable = state.scheduled_from_background.remove(ix).0;
233                drop(state);
234                runnable.run();
235            } else {
236                drop(state);
237                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
238                    return Some(result);
239                }
240                let state = self.state.lock();
241                if state.scheduled_from_background.is_empty() {
242                    if state.forbid_parking {
243                        panic!("deterministic executor parked after a call to forbid_parking");
244                    }
245                    drop(state);
246                    self.parker.lock().park();
247                }
248
249                continue;
250            }
251        }
252
253        None
254    }
255}
256
257#[derive(Default)]
258struct Trace {
259    executed: Vec<Backtrace>,
260    scheduled: Vec<Vec<Backtrace>>,
261    spawned_from_foreground: Vec<Vec<Backtrace>>,
262}
263
264impl Trace {
265    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
266        self.scheduled.push(
267            state
268                .scheduled_from_foreground
269                .iter()
270                .map(|(_, backtrace)| backtrace.clone())
271                .collect(),
272        );
273        self.spawned_from_foreground.push(
274            state
275                .spawned_from_foreground
276                .iter()
277                .map(|(_, backtrace)| backtrace.clone())
278                .collect(),
279        );
280        self.executed.push(executed);
281    }
282
283    fn resolve(&mut self) {
284        for backtrace in &mut self.executed {
285            backtrace.resolve();
286        }
287
288        for backtraces in &mut self.scheduled {
289            for backtrace in backtraces {
290                backtrace.resolve();
291            }
292        }
293
294        for backtraces in &mut self.spawned_from_foreground {
295            for backtrace in backtraces {
296                backtrace.resolve();
297            }
298        }
299    }
300}
301
302impl Debug for Trace {
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
305
306        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
307            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
308                let cwd = std::env::current_dir().unwrap();
309                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
310                    fmt::Display::fmt(&path, fmt)
311                };
312                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
313                for frame in self.0.frames() {
314                    let mut formatted_frame = fmt.frame();
315                    if frame
316                        .symbols()
317                        .iter()
318                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
319                    {
320                        formatted_frame.backtrace_frame(frame)?;
321                        break;
322                    }
323                }
324                fmt.finish()
325            }
326        }
327
328        for ((backtrace, scheduled), spawned_from_foreground) in self
329            .executed
330            .iter()
331            .zip(&self.scheduled)
332            .zip(&self.spawned_from_foreground)
333        {
334            writeln!(f, "Scheduled")?;
335            for backtrace in scheduled {
336                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
337            }
338            if scheduled.is_empty() {
339                writeln!(f, "None")?;
340            }
341            writeln!(f, "==========")?;
342
343            writeln!(f, "Spawned from foreground")?;
344            for backtrace in spawned_from_foreground {
345                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
346            }
347            if spawned_from_foreground.is_empty() {
348                writeln!(f, "None")?;
349            }
350            writeln!(f, "==========")?;
351
352            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
353            writeln!(f, "+++++++++++++++++++")?;
354        }
355
356        Ok(())
357    }
358}
359
360impl Drop for Trace {
361    fn drop(&mut self) {
362        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
363            trace_on_panic == "1" || trace_on_panic == "true"
364        } else {
365            false
366        };
367        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
368            trace_always == "1" || trace_always == "true"
369        } else {
370            false
371        };
372
373        if trace_always || (trace_on_panic && thread::panicking()) {
374            self.resolve();
375            dbg!(self);
376        }
377    }
378}
379
380impl Foreground {
381    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
382        if dispatcher.is_main_thread() {
383            Ok(Self::Platform {
384                dispatcher,
385                _not_send_or_sync: PhantomData,
386            })
387        } else {
388            Err(anyhow!("must be constructed on main thread"))
389        }
390    }
391
392    pub fn test() -> Self {
393        Self::Test(smol::LocalExecutor::new())
394    }
395
396    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
397        let future = future.boxed_local();
398        match self {
399            Self::Platform { dispatcher, .. } => {
400                let dispatcher = dispatcher.clone();
401                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
402                let (runnable, task) = async_task::spawn_local(future, schedule);
403                runnable.schedule();
404                task
405            }
406            Self::Test(executor) => executor.spawn(future),
407            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
408        }
409    }
410
411    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
412        let future = future.boxed_local();
413        match self {
414            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
415            Self::Test(executor) => smol::block_on(executor.run(future)),
416            Self::Deterministic(executor) => executor.run(future),
417        }
418    }
419
420    pub fn forbid_parking(&self) {
421        match self {
422            Self::Deterministic(executor) => {
423                let mut state = executor.state.lock();
424                state.forbid_parking = true;
425                state.rng = StdRng::seed_from_u64(state.seed);
426            }
427            _ => panic!("this method can only be called on a deterministic executor"),
428        }
429    }
430
431    pub async fn timer(&self, duration: Duration) {
432        match self {
433            Self::Deterministic(executor) => {
434                let (tx, mut rx) = barrier::channel();
435                {
436                    let mut state = executor.state.lock();
437                    let wakeup_at = state.now + duration;
438                    state.pending_timers.push((wakeup_at, tx));
439                }
440                rx.recv().await;
441            }
442            _ => {
443                Timer::after(duration).await;
444            }
445        }
446    }
447
448    pub fn advance_clock(&self, duration: Duration) {
449        match self {
450            Self::Deterministic(executor) => {
451                executor.run_until_parked();
452
453                let mut state = executor.state.lock();
454                state.now += duration;
455                let now = state.now;
456                let mut pending_timers = mem::take(&mut state.pending_timers);
457                drop(state);
458
459                pending_timers.retain(|(wakeup, _)| *wakeup > now);
460                executor.state.lock().pending_timers.extend(pending_timers);
461            }
462            _ => panic!("this method can only be called on a deterministic executor"),
463        }
464    }
465
466    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
467        match self {
468            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
469            _ => panic!("this method can only be called on a deterministic executor"),
470        }
471    }
472}
473
474impl Background {
475    pub fn new() -> Self {
476        let executor = Arc::new(Executor::new());
477        let stop = channel::unbounded::<()>();
478
479        for i in 0..2 * num_cpus::get() {
480            let executor = executor.clone();
481            let stop = stop.1.clone();
482            thread::Builder::new()
483                .name(format!("background-executor-{}", i))
484                .spawn(move || smol::block_on(executor.run(stop.recv())))
485                .unwrap();
486        }
487
488        Self::Production {
489            executor,
490            _stop: stop.0,
491        }
492    }
493
494    pub fn num_cpus(&self) -> usize {
495        num_cpus::get()
496    }
497
498    pub fn spawn<T, F>(&self, future: F) -> Task<T>
499    where
500        T: 'static + Send,
501        F: Send + Future<Output = T> + 'static,
502    {
503        let future = future.boxed();
504        match self {
505            Self::Production { executor, .. } => executor.spawn(future),
506            Self::Deterministic(executor) => executor.spawn(future),
507        }
508    }
509
510    pub fn block_with_timeout<F, T>(
511        &self,
512        timeout: Duration,
513        future: F,
514    ) -> Result<T, Pin<Box<dyn Future<Output = T>>>>
515    where
516        T: 'static,
517        F: 'static + Unpin + Future<Output = T>,
518    {
519        let mut future = future.boxed_local();
520        if !timeout.is_zero() {
521            let output = match self {
522                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
523                Self::Deterministic(executor) => executor.block_on(&mut future),
524            };
525            if let Some(output) = output {
526                return Ok(output);
527            }
528        }
529        Err(future)
530    }
531
532    pub async fn scoped<'scope, F>(&self, scheduler: F)
533    where
534        F: FnOnce(&mut Scope<'scope>),
535    {
536        let mut scope = Scope {
537            futures: Default::default(),
538            _phantom: PhantomData,
539        };
540        (scheduler)(&mut scope);
541        let spawned = scope
542            .futures
543            .into_iter()
544            .map(|f| self.spawn(f))
545            .collect::<Vec<_>>();
546        for task in spawned {
547            task.await;
548        }
549    }
550}
551
552pub struct Scope<'a> {
553    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
554    _phantom: PhantomData<&'a ()>,
555}
556
557impl<'a> Scope<'a> {
558    pub fn spawn<F>(&mut self, f: F)
559    where
560        F: Future<Output = ()> + Send + 'a,
561    {
562        let f = unsafe {
563            mem::transmute::<
564                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
565                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
566            >(Box::pin(f))
567        };
568        self.futures.push(f);
569    }
570}
571
572pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
573    let executor = Arc::new(Deterministic::new(seed));
574    (
575        Rc::new(Foreground::Deterministic(executor.clone())),
576        Arc::new(Background::Deterministic(executor)),
577    )
578}