executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use postage::{barrier, prelude::Stream as _};
  7use rand::prelude::*;
  8use smol::{channel, prelude::*, Executor, Timer};
  9use std::{
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{platform, util};
 27
 28pub enum Foreground {
 29    Platform {
 30        dispatcher: Arc<dyn platform::Dispatcher>,
 31        _not_send_or_sync: PhantomData<Rc<()>>,
 32    },
 33    Test(smol::LocalExecutor<'static>),
 34    Deterministic(Arc<Deterministic>),
 35}
 36
 37pub enum Background {
 38    Deterministic(Arc<Deterministic>),
 39    Production {
 40        executor: Arc<smol::Executor<'static>>,
 41        _stop: channel::Sender<()>,
 42    },
 43}
 44
 45struct DeterministicState {
 46    rng: StdRng,
 47    seed: u64,
 48    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 49    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 50    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 51    forbid_parking: bool,
 52    block_on_ticks: RangeInclusive<usize>,
 53    now: Instant,
 54    pending_sleeps: Vec<(Instant, barrier::Sender)>,
 55}
 56
 57pub struct Deterministic {
 58    state: Arc<Mutex<DeterministicState>>,
 59    parker: Mutex<parking::Parker>,
 60}
 61
 62impl Deterministic {
 63    fn new(seed: u64) -> Self {
 64        Self {
 65            state: Arc::new(Mutex::new(DeterministicState {
 66                rng: StdRng::seed_from_u64(seed),
 67                seed,
 68                scheduled_from_foreground: Default::default(),
 69                scheduled_from_background: Default::default(),
 70                spawned_from_foreground: Default::default(),
 71                forbid_parking: false,
 72                block_on_ticks: 0..=1000,
 73                now: Instant::now(),
 74                pending_sleeps: Default::default(),
 75            })),
 76            parker: Default::default(),
 77        }
 78    }
 79
 80    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 81    where
 82        T: 'static,
 83        F: Future<Output = T> + 'static,
 84    {
 85        let backtrace = Backtrace::new_unresolved();
 86        let scheduled_once = AtomicBool::new(false);
 87        let state = self.state.clone();
 88        let unparker = self.parker.lock().unparker();
 89        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 90            let mut state = state.lock();
 91            let backtrace = backtrace.clone();
 92            if scheduled_once.fetch_or(true, SeqCst) {
 93                state.scheduled_from_foreground.push((runnable, backtrace));
 94            } else {
 95                state.spawned_from_foreground.push((runnable, backtrace));
 96            }
 97            unparker.unpark();
 98        });
 99        runnable.schedule();
100        task
101    }
102
103    pub fn spawn<F, T>(&self, future: F) -> Task<T>
104    where
105        T: 'static + Send,
106        F: 'static + Send + Future<Output = T>,
107    {
108        let backtrace = Backtrace::new_unresolved();
109        let state = self.state.clone();
110        let unparker = self.parker.lock().unparker();
111        let (runnable, task) = async_task::spawn(future, move |runnable| {
112            let mut state = state.lock();
113            state
114                .scheduled_from_background
115                .push((runnable, backtrace.clone()));
116            unparker.unpark();
117        });
118        runnable.schedule();
119        task
120    }
121
122    pub fn run<F, T>(&self, future: F) -> T
123    where
124        T: 'static,
125        F: Future<Output = T> + 'static,
126    {
127        smol::pin!(future);
128
129        let unparker = self.parker.lock().unparker();
130        let woken = Arc::new(AtomicBool::new(false));
131        let waker = {
132            let woken = woken.clone();
133            waker_fn(move || {
134                woken.store(true, SeqCst);
135                unparker.unpark();
136            })
137        };
138
139        let mut cx = Context::from_waker(&waker);
140        let mut trace = Trace::default();
141        loop {
142            let mut state = self.state.lock();
143            let runnable_count = state.scheduled_from_foreground.len()
144                + state.scheduled_from_background.len()
145                + state.spawned_from_foreground.len();
146
147            let ix = state.rng.gen_range(0..=runnable_count);
148            if ix < state.scheduled_from_foreground.len() {
149                let (_, backtrace) = &state.scheduled_from_foreground[ix];
150                trace.record(&state, backtrace.clone());
151                let runnable = state.scheduled_from_foreground.remove(ix).0;
152                drop(state);
153                runnable.run();
154            } else if ix - state.scheduled_from_foreground.len()
155                < state.scheduled_from_background.len()
156            {
157                let ix = ix - state.scheduled_from_foreground.len();
158                let (_, backtrace) = &state.scheduled_from_background[ix];
159                trace.record(&state, backtrace.clone());
160                let runnable = state.scheduled_from_background.remove(ix).0;
161                drop(state);
162                runnable.run();
163            } else if ix < runnable_count {
164                let (_, backtrace) = &state.spawned_from_foreground[0];
165                trace.record(&state, backtrace.clone());
166                let runnable = state.spawned_from_foreground.remove(0).0;
167                drop(state);
168                runnable.run();
169            } else {
170                drop(state);
171                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
172                    return result;
173                }
174                let state = self.state.lock();
175                if state.scheduled_from_foreground.is_empty()
176                    && state.scheduled_from_background.is_empty()
177                    && state.spawned_from_foreground.is_empty()
178                {
179                    if state.forbid_parking && !woken.load(SeqCst) {
180                        panic!("deterministic executor parked after a call to forbid_parking");
181                    }
182                    drop(state);
183                    woken.store(false, SeqCst);
184                    self.parker.lock().park();
185                }
186
187                continue;
188            }
189        }
190    }
191
192    pub fn block_on<F, T>(&self, future: F) -> Option<T>
193    where
194        T: 'static,
195        F: Future<Output = T>,
196    {
197        smol::pin!(future);
198
199        let unparker = self.parker.lock().unparker();
200        let waker = waker_fn(move || {
201            unparker.unpark();
202        });
203        let max_ticks = {
204            let mut state = self.state.lock();
205            let range = state.block_on_ticks.clone();
206            state.rng.gen_range(range)
207        };
208
209        let mut cx = Context::from_waker(&waker);
210        let mut trace = Trace::default();
211        for _ in 0..max_ticks {
212            let mut state = self.state.lock();
213            let runnable_count = state.scheduled_from_background.len();
214            let ix = state.rng.gen_range(0..=runnable_count);
215            if ix < state.scheduled_from_background.len() {
216                let (_, backtrace) = &state.scheduled_from_background[ix];
217                trace.record(&state, backtrace.clone());
218                let runnable = state.scheduled_from_background.remove(ix).0;
219                drop(state);
220                runnable.run();
221            } else {
222                drop(state);
223                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
224                    return Some(result);
225                }
226                let state = self.state.lock();
227                if state.scheduled_from_background.is_empty() {
228                    if state.forbid_parking {
229                        panic!("deterministic executor parked after a call to forbid_parking");
230                    }
231                    drop(state);
232                    self.parker.lock().park();
233                }
234
235                continue;
236            }
237        }
238
239        None
240    }
241}
242
243#[derive(Default)]
244struct Trace {
245    executed: Vec<Backtrace>,
246    scheduled: Vec<Vec<Backtrace>>,
247    spawned_from_foreground: Vec<Vec<Backtrace>>,
248}
249
250impl Trace {
251    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
252        self.scheduled.push(
253            state
254                .scheduled_from_foreground
255                .iter()
256                .map(|(_, backtrace)| backtrace.clone())
257                .collect(),
258        );
259        self.spawned_from_foreground.push(
260            state
261                .spawned_from_foreground
262                .iter()
263                .map(|(_, backtrace)| backtrace.clone())
264                .collect(),
265        );
266        self.executed.push(executed);
267    }
268
269    fn resolve(&mut self) {
270        for backtrace in &mut self.executed {
271            backtrace.resolve();
272        }
273
274        for backtraces in &mut self.scheduled {
275            for backtrace in backtraces {
276                backtrace.resolve();
277            }
278        }
279
280        for backtraces in &mut self.spawned_from_foreground {
281            for backtrace in backtraces {
282                backtrace.resolve();
283            }
284        }
285    }
286}
287
288impl Debug for Trace {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
291
292        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
293            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
294                let cwd = std::env::current_dir().unwrap();
295                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
296                    fmt::Display::fmt(&path, fmt)
297                };
298                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
299                for frame in self.0.frames() {
300                    let mut formatted_frame = fmt.frame();
301                    if frame
302                        .symbols()
303                        .iter()
304                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
305                    {
306                        formatted_frame.backtrace_frame(frame)?;
307                        break;
308                    }
309                }
310                fmt.finish()
311            }
312        }
313
314        for ((backtrace, scheduled), spawned_from_foreground) in self
315            .executed
316            .iter()
317            .zip(&self.scheduled)
318            .zip(&self.spawned_from_foreground)
319        {
320            writeln!(f, "Scheduled")?;
321            for backtrace in scheduled {
322                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
323            }
324            if scheduled.is_empty() {
325                writeln!(f, "None")?;
326            }
327            writeln!(f, "==========")?;
328
329            writeln!(f, "Spawned from foreground")?;
330            for backtrace in spawned_from_foreground {
331                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
332            }
333            if spawned_from_foreground.is_empty() {
334                writeln!(f, "None")?;
335            }
336            writeln!(f, "==========")?;
337
338            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
339            writeln!(f, "+++++++++++++++++++")?;
340        }
341
342        Ok(())
343    }
344}
345
346impl Drop for Trace {
347    fn drop(&mut self) {
348        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
349            trace_on_panic == "1" || trace_on_panic == "true"
350        } else {
351            false
352        };
353        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
354            trace_always == "1" || trace_always == "true"
355        } else {
356            false
357        };
358
359        if trace_always || (trace_on_panic && thread::panicking()) {
360            self.resolve();
361            dbg!(self);
362        }
363    }
364}
365
366impl Foreground {
367    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
368        if dispatcher.is_main_thread() {
369            Ok(Self::Platform {
370                dispatcher,
371                _not_send_or_sync: PhantomData,
372            })
373        } else {
374            Err(anyhow!("must be constructed on main thread"))
375        }
376    }
377
378    pub fn test() -> Self {
379        Self::Test(smol::LocalExecutor::new())
380    }
381
382    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
383        match self {
384            Self::Platform { dispatcher, .. } => {
385                let dispatcher = dispatcher.clone();
386                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
387                let (runnable, task) = async_task::spawn_local(future, schedule);
388                runnable.schedule();
389                task
390            }
391            Self::Test(executor) => executor.spawn(future),
392            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
393        }
394    }
395
396    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
397        match self {
398            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
399            Self::Test(executor) => smol::block_on(executor.run(future)),
400            Self::Deterministic(executor) => executor.run(future),
401        }
402    }
403
404    pub fn forbid_parking(&self) {
405        match self {
406            Self::Deterministic(executor) => {
407                let mut state = executor.state.lock();
408                state.forbid_parking = true;
409                state.rng = StdRng::seed_from_u64(state.seed);
410            }
411            _ => panic!("this method can only be called on a deterministic executor"),
412        }
413    }
414
415    pub async fn sleep(&self, duration: Duration) {
416        match self {
417            Self::Deterministic(executor) => {
418                let (tx, mut rx) = barrier::channel();
419                {
420                    let mut state = executor.state.lock();
421                    let wakeup_at = state.now + duration;
422                    state.pending_sleeps.push((wakeup_at, tx));
423                }
424                rx.recv().await;
425            }
426            _ => {
427                Timer::after(duration).await;
428            }
429        }
430    }
431
432    pub fn advance_clock(&self, duration: Duration) {
433        match self {
434            Self::Deterministic(executor) => {
435                let mut state = executor.state.lock();
436                state.now += duration;
437                let now = state.now;
438                state.pending_sleeps.retain(|(wakeup, _)| *wakeup > now);
439            }
440            _ => panic!("this method can only be called on a deterministic executor"),
441        }
442    }
443
444    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
445        match self {
446            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
447            _ => panic!("this method can only be called on a deterministic executor"),
448        }
449    }
450}
451
452impl Background {
453    pub fn new() -> Self {
454        let executor = Arc::new(Executor::new());
455        let stop = channel::unbounded::<()>();
456
457        for i in 0..2 * num_cpus::get() {
458            let executor = executor.clone();
459            let stop = stop.1.clone();
460            thread::Builder::new()
461                .name(format!("background-executor-{}", i))
462                .spawn(move || smol::block_on(executor.run(stop.recv())))
463                .unwrap();
464        }
465
466        Self::Production {
467            executor,
468            _stop: stop.0,
469        }
470    }
471
472    pub fn num_cpus(&self) -> usize {
473        num_cpus::get()
474    }
475
476    pub fn spawn<T, F>(&self, future: F) -> Task<T>
477    where
478        T: 'static + Send,
479        F: Send + Future<Output = T> + 'static,
480    {
481        match self {
482            Self::Production { executor, .. } => executor.spawn(future),
483            Self::Deterministic(executor) => executor.spawn(future),
484        }
485    }
486
487    pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
488    where
489        T: 'static,
490        F: 'static + Unpin + Future<Output = T>,
491    {
492        if !timeout.is_zero() {
493            let output = match self {
494                Self::Production { .. } => {
495                    smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
496                }
497                Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
498            };
499            if let Some(output) = output {
500                return Ok(output);
501            }
502        }
503        Err(future)
504    }
505
506    pub async fn scoped<'scope, F>(&self, scheduler: F)
507    where
508        F: FnOnce(&mut Scope<'scope>),
509    {
510        let mut scope = Scope {
511            futures: Default::default(),
512            _phantom: PhantomData,
513        };
514        (scheduler)(&mut scope);
515        let spawned = scope
516            .futures
517            .into_iter()
518            .map(|f| self.spawn(f))
519            .collect::<Vec<_>>();
520        for task in spawned {
521            task.await;
522        }
523    }
524}
525
526pub struct Scope<'a> {
527    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
528    _phantom: PhantomData<&'a ()>,
529}
530
531impl<'a> Scope<'a> {
532    pub fn spawn<F>(&mut self, f: F)
533    where
534        F: Future<Output = ()> + Send + 'a,
535    {
536        let f = unsafe {
537            mem::transmute::<
538                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
539                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
540            >(Box::pin(f))
541        };
542        self.futures.push(f);
543    }
544}
545
546pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
547    let executor = Arc::new(Deterministic::new(seed));
548    (
549        Rc::new(Foreground::Deterministic(executor.clone())),
550        Arc::new(Background::Deterministic(executor)),
551    )
552}