executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use postage::{barrier, prelude::Stream as _};
  7use rand::prelude::*;
  8use smol::{channel, prelude::*, Executor, Timer};
  9use std::{
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{platform, util};
 27
 28pub enum Foreground {
 29    Platform {
 30        dispatcher: Arc<dyn platform::Dispatcher>,
 31        _not_send_or_sync: PhantomData<Rc<()>>,
 32    },
 33    Test(smol::LocalExecutor<'static>),
 34    Deterministic(Arc<Deterministic>),
 35}
 36
 37pub enum Background {
 38    Deterministic(Arc<Deterministic>),
 39    Production {
 40        executor: Arc<smol::Executor<'static>>,
 41        _stop: channel::Sender<()>,
 42    },
 43}
 44
 45struct DeterministicState {
 46    rng: StdRng,
 47    seed: u64,
 48    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 49    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 50    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 51    forbid_parking: bool,
 52    block_on_ticks: RangeInclusive<usize>,
 53    now: Instant,
 54    pending_sleeps: Vec<(Instant, barrier::Sender)>,
 55}
 56
 57pub struct Deterministic {
 58    state: Arc<Mutex<DeterministicState>>,
 59    parker: Mutex<parking::Parker>,
 60}
 61
 62impl Deterministic {
 63    fn new(seed: u64) -> Self {
 64        Self {
 65            state: Arc::new(Mutex::new(DeterministicState {
 66                rng: StdRng::seed_from_u64(seed),
 67                seed,
 68                scheduled_from_foreground: Default::default(),
 69                scheduled_from_background: Default::default(),
 70                spawned_from_foreground: Default::default(),
 71                forbid_parking: false,
 72                block_on_ticks: 0..=1000,
 73                now: Instant::now(),
 74                pending_sleeps: Default::default(),
 75            })),
 76            parker: Default::default(),
 77        }
 78    }
 79
 80    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 81    where
 82        T: 'static,
 83        F: Future<Output = T> + 'static,
 84    {
 85        let backtrace = Backtrace::new_unresolved();
 86        let scheduled_once = AtomicBool::new(false);
 87        let state = self.state.clone();
 88        let unparker = self.parker.lock().unparker();
 89        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 90            let mut state = state.lock();
 91            let backtrace = backtrace.clone();
 92            if scheduled_once.fetch_or(true, SeqCst) {
 93                state.scheduled_from_foreground.push((runnable, backtrace));
 94            } else {
 95                state.spawned_from_foreground.push((runnable, backtrace));
 96            }
 97            unparker.unpark();
 98        });
 99        runnable.schedule();
100        task
101    }
102
103    pub fn spawn<F, T>(&self, future: F) -> Task<T>
104    where
105        T: 'static + Send,
106        F: 'static + Send + Future<Output = T>,
107    {
108        let backtrace = Backtrace::new_unresolved();
109        let state = self.state.clone();
110        let unparker = self.parker.lock().unparker();
111        let (runnable, task) = async_task::spawn(future, move |runnable| {
112            let mut state = state.lock();
113            state
114                .scheduled_from_background
115                .push((runnable, backtrace.clone()));
116            unparker.unpark();
117        });
118        runnable.schedule();
119        task
120    }
121
122    pub fn run<F, T>(&self, future: F) -> T
123    where
124        T: 'static,
125        F: Future<Output = T> + 'static,
126    {
127        let woken = Arc::new(AtomicBool::new(false));
128        let mut future = Box::pin(future);
129        loop {
130            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
131                return result;
132            }
133
134            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
135                panic!("deterministic executor parked after a call to forbid_parking");
136            }
137
138            woken.store(false, SeqCst);
139            self.parker.lock().park();
140        }
141    }
142
143    fn run_until_parked(&self) {
144        let woken = Arc::new(AtomicBool::new(false));
145        let future = std::future::pending::<()>();
146        smol::pin!(future);
147        self.run_internal(woken, future);
148    }
149
150    pub fn run_internal<F, T>(&self, woken: Arc<AtomicBool>, mut future: F) -> Option<T>
151    where
152        T: 'static,
153        F: Future<Output = T> + Unpin,
154    {
155        let unparker = self.parker.lock().unparker();
156        let waker = waker_fn(move || {
157            woken.store(true, SeqCst);
158            unparker.unpark();
159        });
160
161        let mut cx = Context::from_waker(&waker);
162        let mut trace = Trace::default();
163        loop {
164            let mut state = self.state.lock();
165            let runnable_count = state.scheduled_from_foreground.len()
166                + state.scheduled_from_background.len()
167                + state.spawned_from_foreground.len();
168
169            let ix = state.rng.gen_range(0..=runnable_count);
170            if ix < state.scheduled_from_foreground.len() {
171                let (_, backtrace) = &state.scheduled_from_foreground[ix];
172                trace.record(&state, backtrace.clone());
173                let runnable = state.scheduled_from_foreground.remove(ix).0;
174                drop(state);
175                runnable.run();
176            } else if ix - state.scheduled_from_foreground.len()
177                < state.scheduled_from_background.len()
178            {
179                let ix = ix - state.scheduled_from_foreground.len();
180                let (_, backtrace) = &state.scheduled_from_background[ix];
181                trace.record(&state, backtrace.clone());
182                let runnable = state.scheduled_from_background.remove(ix).0;
183                drop(state);
184                runnable.run();
185            } else if ix < runnable_count {
186                let (_, backtrace) = &state.spawned_from_foreground[0];
187                trace.record(&state, backtrace.clone());
188                let runnable = state.spawned_from_foreground.remove(0).0;
189                drop(state);
190                runnable.run();
191            } else {
192                drop(state);
193                if let Poll::Ready(result) = future.poll(&mut cx) {
194                    return Some(result);
195                }
196
197                let state = self.state.lock();
198                if state.scheduled_from_foreground.is_empty()
199                    && state.scheduled_from_background.is_empty()
200                    && state.spawned_from_foreground.is_empty()
201                {
202                    return None;
203                }
204            }
205        }
206    }
207
208    pub fn block_on<F, T>(&self, future: F) -> Option<T>
209    where
210        T: 'static,
211        F: Future<Output = T>,
212    {
213        smol::pin!(future);
214
215        let unparker = self.parker.lock().unparker();
216        let waker = waker_fn(move || {
217            unparker.unpark();
218        });
219        let max_ticks = {
220            let mut state = self.state.lock();
221            let range = state.block_on_ticks.clone();
222            state.rng.gen_range(range)
223        };
224
225        let mut cx = Context::from_waker(&waker);
226        let mut trace = Trace::default();
227        for _ in 0..max_ticks {
228            let mut state = self.state.lock();
229            let runnable_count = state.scheduled_from_background.len();
230            let ix = state.rng.gen_range(0..=runnable_count);
231            if ix < state.scheduled_from_background.len() {
232                let (_, backtrace) = &state.scheduled_from_background[ix];
233                trace.record(&state, backtrace.clone());
234                let runnable = state.scheduled_from_background.remove(ix).0;
235                drop(state);
236                runnable.run();
237            } else {
238                drop(state);
239                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
240                    return Some(result);
241                }
242                let state = self.state.lock();
243                if state.scheduled_from_background.is_empty() {
244                    if state.forbid_parking {
245                        panic!("deterministic executor parked after a call to forbid_parking");
246                    }
247                    drop(state);
248                    self.parker.lock().park();
249                }
250
251                continue;
252            }
253        }
254
255        None
256    }
257}
258
259#[derive(Default)]
260struct Trace {
261    executed: Vec<Backtrace>,
262    scheduled: Vec<Vec<Backtrace>>,
263    spawned_from_foreground: Vec<Vec<Backtrace>>,
264}
265
266impl Trace {
267    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
268        self.scheduled.push(
269            state
270                .scheduled_from_foreground
271                .iter()
272                .map(|(_, backtrace)| backtrace.clone())
273                .collect(),
274        );
275        self.spawned_from_foreground.push(
276            state
277                .spawned_from_foreground
278                .iter()
279                .map(|(_, backtrace)| backtrace.clone())
280                .collect(),
281        );
282        self.executed.push(executed);
283    }
284
285    fn resolve(&mut self) {
286        for backtrace in &mut self.executed {
287            backtrace.resolve();
288        }
289
290        for backtraces in &mut self.scheduled {
291            for backtrace in backtraces {
292                backtrace.resolve();
293            }
294        }
295
296        for backtraces in &mut self.spawned_from_foreground {
297            for backtrace in backtraces {
298                backtrace.resolve();
299            }
300        }
301    }
302}
303
304impl Debug for Trace {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
307
308        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
309            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
310                let cwd = std::env::current_dir().unwrap();
311                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
312                    fmt::Display::fmt(&path, fmt)
313                };
314                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
315                for frame in self.0.frames() {
316                    let mut formatted_frame = fmt.frame();
317                    if frame
318                        .symbols()
319                        .iter()
320                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
321                    {
322                        formatted_frame.backtrace_frame(frame)?;
323                        break;
324                    }
325                }
326                fmt.finish()
327            }
328        }
329
330        for ((backtrace, scheduled), spawned_from_foreground) in self
331            .executed
332            .iter()
333            .zip(&self.scheduled)
334            .zip(&self.spawned_from_foreground)
335        {
336            writeln!(f, "Scheduled")?;
337            for backtrace in scheduled {
338                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
339            }
340            if scheduled.is_empty() {
341                writeln!(f, "None")?;
342            }
343            writeln!(f, "==========")?;
344
345            writeln!(f, "Spawned from foreground")?;
346            for backtrace in spawned_from_foreground {
347                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
348            }
349            if spawned_from_foreground.is_empty() {
350                writeln!(f, "None")?;
351            }
352            writeln!(f, "==========")?;
353
354            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
355            writeln!(f, "+++++++++++++++++++")?;
356        }
357
358        Ok(())
359    }
360}
361
362impl Drop for Trace {
363    fn drop(&mut self) {
364        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
365            trace_on_panic == "1" || trace_on_panic == "true"
366        } else {
367            false
368        };
369        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
370            trace_always == "1" || trace_always == "true"
371        } else {
372            false
373        };
374
375        if trace_always || (trace_on_panic && thread::panicking()) {
376            self.resolve();
377            dbg!(self);
378        }
379    }
380}
381
382impl Foreground {
383    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
384        if dispatcher.is_main_thread() {
385            Ok(Self::Platform {
386                dispatcher,
387                _not_send_or_sync: PhantomData,
388            })
389        } else {
390            Err(anyhow!("must be constructed on main thread"))
391        }
392    }
393
394    pub fn test() -> Self {
395        Self::Test(smol::LocalExecutor::new())
396    }
397
398    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
399        match self {
400            Self::Platform { dispatcher, .. } => {
401                let dispatcher = dispatcher.clone();
402                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
403                let (runnable, task) = async_task::spawn_local(future, schedule);
404                runnable.schedule();
405                task
406            }
407            Self::Test(executor) => executor.spawn(future),
408            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
409        }
410    }
411
412    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
413        match self {
414            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
415            Self::Test(executor) => smol::block_on(executor.run(future)),
416            Self::Deterministic(executor) => executor.run(future),
417        }
418    }
419
420    pub fn forbid_parking(&self) {
421        match self {
422            Self::Deterministic(executor) => {
423                let mut state = executor.state.lock();
424                state.forbid_parking = true;
425                state.rng = StdRng::seed_from_u64(state.seed);
426            }
427            _ => panic!("this method can only be called on a deterministic executor"),
428        }
429    }
430
431    pub async fn sleep(&self, duration: Duration) {
432        match self {
433            Self::Deterministic(executor) => {
434                let (tx, mut rx) = barrier::channel();
435                {
436                    let mut state = executor.state.lock();
437                    let wakeup_at = state.now + duration;
438                    state.pending_sleeps.push((wakeup_at, tx));
439                }
440                rx.recv().await;
441            }
442            _ => {
443                Timer::after(duration).await;
444            }
445        }
446    }
447
448    pub fn advance_clock(&self, duration: Duration) {
449        match self {
450            Self::Deterministic(executor) => {
451                executor.run_until_parked();
452
453                let mut state = executor.state.lock();
454                state.now += duration;
455                let now = state.now;
456                let mut pending_sleeps = mem::take(&mut state.pending_sleeps);
457                drop(state);
458
459                pending_sleeps.retain(|(wakeup, _)| *wakeup > now);
460                executor.state.lock().pending_sleeps.extend(pending_sleeps);
461            }
462            _ => panic!("this method can only be called on a deterministic executor"),
463        }
464    }
465
466    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
467        match self {
468            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
469            _ => panic!("this method can only be called on a deterministic executor"),
470        }
471    }
472}
473
474impl Background {
475    pub fn new() -> Self {
476        let executor = Arc::new(Executor::new());
477        let stop = channel::unbounded::<()>();
478
479        for i in 0..2 * num_cpus::get() {
480            let executor = executor.clone();
481            let stop = stop.1.clone();
482            thread::Builder::new()
483                .name(format!("background-executor-{}", i))
484                .spawn(move || smol::block_on(executor.run(stop.recv())))
485                .unwrap();
486        }
487
488        Self::Production {
489            executor,
490            _stop: stop.0,
491        }
492    }
493
494    pub fn num_cpus(&self) -> usize {
495        num_cpus::get()
496    }
497
498    pub fn spawn<T, F>(&self, future: F) -> Task<T>
499    where
500        T: 'static + Send,
501        F: Send + Future<Output = T> + 'static,
502    {
503        match self {
504            Self::Production { executor, .. } => executor.spawn(future),
505            Self::Deterministic(executor) => executor.spawn(future),
506        }
507    }
508
509    pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
510    where
511        T: 'static,
512        F: 'static + Unpin + Future<Output = T>,
513    {
514        if !timeout.is_zero() {
515            let output = match self {
516                Self::Production { .. } => {
517                    smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
518                }
519                Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
520            };
521            if let Some(output) = output {
522                return Ok(output);
523            }
524        }
525        Err(future)
526    }
527
528    pub async fn scoped<'scope, F>(&self, scheduler: F)
529    where
530        F: FnOnce(&mut Scope<'scope>),
531    {
532        let mut scope = Scope {
533            futures: Default::default(),
534            _phantom: PhantomData,
535        };
536        (scheduler)(&mut scope);
537        let spawned = scope
538            .futures
539            .into_iter()
540            .map(|f| self.spawn(f))
541            .collect::<Vec<_>>();
542        for task in spawned {
543            task.await;
544        }
545    }
546}
547
548pub struct Scope<'a> {
549    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
550    _phantom: PhantomData<&'a ()>,
551}
552
553impl<'a> Scope<'a> {
554    pub fn spawn<F>(&mut self, f: F)
555    where
556        F: Future<Output = ()> + Send + 'a,
557    {
558        let f = unsafe {
559            mem::transmute::<
560                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
561                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
562            >(Box::pin(f))
563        };
564        self.futures.push(f);
565    }
566}
567
568pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
569    let executor = Arc::new(Deterministic::new(seed));
570    (
571        Rc::new(Foreground::Deterministic(executor.clone())),
572        Arc::new(Background::Deterministic(executor)),
573    )
574}