executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    ops::RangeInclusive,
 13    pin::Pin,
 14    rc::Rc,
 15    sync::{
 16        atomic::{AtomicBool, Ordering::SeqCst},
 17        Arc,
 18    },
 19    task::{Context, Poll},
 20    thread,
 21    time::Duration,
 22};
 23use waker_fn::waker_fn;
 24
 25use crate::{platform, util};
 26
 27pub enum Foreground {
 28    Platform {
 29        dispatcher: Arc<dyn platform::Dispatcher>,
 30        _not_send_or_sync: PhantomData<Rc<()>>,
 31    },
 32    Test(smol::LocalExecutor<'static>),
 33    Deterministic(Arc<Deterministic>),
 34}
 35
 36pub enum Background {
 37    Deterministic(Arc<Deterministic>),
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44struct DeterministicState {
 45    rng: StdRng,
 46    seed: u64,
 47    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 48    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 49    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 50    forbid_parking: bool,
 51    block_on_ticks: RangeInclusive<usize>,
 52}
 53
 54pub struct Deterministic {
 55    state: Arc<Mutex<DeterministicState>>,
 56    parker: Mutex<parking::Parker>,
 57}
 58
 59impl Deterministic {
 60    fn new(seed: u64) -> Self {
 61        Self {
 62            state: Arc::new(Mutex::new(DeterministicState {
 63                rng: StdRng::seed_from_u64(seed),
 64                seed,
 65                scheduled_from_foreground: Default::default(),
 66                scheduled_from_background: Default::default(),
 67                spawned_from_foreground: Default::default(),
 68                forbid_parking: false,
 69                block_on_ticks: 0..=1000,
 70            })),
 71            parker: Default::default(),
 72        }
 73    }
 74
 75    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 76    where
 77        T: 'static,
 78        F: Future<Output = T> + 'static,
 79    {
 80        let backtrace = Backtrace::new_unresolved();
 81        let scheduled_once = AtomicBool::new(false);
 82        let state = self.state.clone();
 83        let unparker = self.parker.lock().unparker();
 84        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 85            let mut state = state.lock();
 86            let backtrace = backtrace.clone();
 87            if scheduled_once.fetch_or(true, SeqCst) {
 88                state.scheduled_from_foreground.push((runnable, backtrace));
 89            } else {
 90                state.spawned_from_foreground.push((runnable, backtrace));
 91            }
 92            unparker.unpark();
 93        });
 94        runnable.schedule();
 95        task
 96    }
 97
 98    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 99    where
100        T: 'static + Send,
101        F: 'static + Send + Future<Output = T>,
102    {
103        let backtrace = Backtrace::new_unresolved();
104        let state = self.state.clone();
105        let unparker = self.parker.lock().unparker();
106        let (runnable, task) = async_task::spawn(future, move |runnable| {
107            let mut state = state.lock();
108            state
109                .scheduled_from_background
110                .push((runnable, backtrace.clone()));
111            unparker.unpark();
112        });
113        runnable.schedule();
114        task
115    }
116
117    pub fn run<F, T>(&self, future: F) -> T
118    where
119        T: 'static,
120        F: Future<Output = T> + 'static,
121    {
122        smol::pin!(future);
123
124        let unparker = self.parker.lock().unparker();
125        let waker = waker_fn(move || {
126            unparker.unpark();
127        });
128
129        let mut cx = Context::from_waker(&waker);
130        let mut trace = Trace::default();
131        loop {
132            let mut state = self.state.lock();
133            let runnable_count = state.scheduled_from_foreground.len()
134                + state.scheduled_from_background.len()
135                + state.spawned_from_foreground.len();
136
137            let ix = state.rng.gen_range(0..=runnable_count);
138            if ix < state.scheduled_from_foreground.len() {
139                let (_, backtrace) = &state.scheduled_from_foreground[ix];
140                trace.record(&state, backtrace.clone());
141                let runnable = state.scheduled_from_foreground.remove(ix).0;
142                drop(state);
143                runnable.run();
144            } else if ix - state.scheduled_from_foreground.len()
145                < state.scheduled_from_background.len()
146            {
147                let ix = ix - state.scheduled_from_foreground.len();
148                let (_, backtrace) = &state.scheduled_from_background[ix];
149                trace.record(&state, backtrace.clone());
150                let runnable = state.scheduled_from_background.remove(ix).0;
151                drop(state);
152                runnable.run();
153            } else if ix < runnable_count {
154                let (_, backtrace) = &state.spawned_from_foreground[0];
155                trace.record(&state, backtrace.clone());
156                let runnable = state.spawned_from_foreground.remove(0).0;
157                drop(state);
158                runnable.run();
159            } else {
160                drop(state);
161                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
162                    return result;
163                }
164                let state = self.state.lock();
165                if state.scheduled_from_foreground.is_empty()
166                    && state.scheduled_from_background.is_empty()
167                    && state.spawned_from_foreground.is_empty()
168                {
169                    if state.forbid_parking {
170                        panic!("deterministic executor parked after a call to forbid_parking");
171                    }
172                    drop(state);
173                    self.parker.lock().park();
174                }
175
176                continue;
177            }
178        }
179    }
180
181    pub fn block_on<F, T>(&self, future: F) -> Option<T>
182    where
183        T: 'static,
184        F: Future<Output = T>,
185    {
186        smol::pin!(future);
187
188        let unparker = self.parker.lock().unparker();
189        let waker = waker_fn(move || {
190            unparker.unpark();
191        });
192        let max_ticks = {
193            let mut state = self.state.lock();
194            let range = state.block_on_ticks.clone();
195            state.rng.gen_range(range)
196        };
197
198        let mut cx = Context::from_waker(&waker);
199        let mut trace = Trace::default();
200        for _ in 0..max_ticks {
201            let mut state = self.state.lock();
202            let runnable_count = state.scheduled_from_background.len();
203            let ix = state.rng.gen_range(0..=runnable_count);
204            if ix < state.scheduled_from_background.len() {
205                let (_, backtrace) = &state.scheduled_from_background[ix];
206                trace.record(&state, backtrace.clone());
207                let runnable = state.scheduled_from_background.remove(ix).0;
208                drop(state);
209                runnable.run();
210            } else {
211                drop(state);
212                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
213                    return Some(result);
214                }
215                let state = self.state.lock();
216                if state.scheduled_from_background.is_empty() {
217                    if state.forbid_parking {
218                        panic!("deterministic executor parked after a call to forbid_parking");
219                    }
220                    drop(state);
221                    self.parker.lock().park();
222                }
223
224                continue;
225            }
226        }
227
228        None
229    }
230}
231
232#[derive(Default)]
233struct Trace {
234    executed: Vec<Backtrace>,
235    scheduled: Vec<Vec<Backtrace>>,
236    spawned_from_foreground: Vec<Vec<Backtrace>>,
237}
238
239impl Trace {
240    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
241        self.scheduled.push(
242            state
243                .scheduled_from_foreground
244                .iter()
245                .map(|(_, backtrace)| backtrace.clone())
246                .collect(),
247        );
248        self.spawned_from_foreground.push(
249            state
250                .spawned_from_foreground
251                .iter()
252                .map(|(_, backtrace)| backtrace.clone())
253                .collect(),
254        );
255        self.executed.push(executed);
256    }
257
258    fn resolve(&mut self) {
259        for backtrace in &mut self.executed {
260            backtrace.resolve();
261        }
262
263        for backtraces in &mut self.scheduled {
264            for backtrace in backtraces {
265                backtrace.resolve();
266            }
267        }
268
269        for backtraces in &mut self.spawned_from_foreground {
270            for backtrace in backtraces {
271                backtrace.resolve();
272            }
273        }
274    }
275}
276
277impl Debug for Trace {
278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
280
281        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
282            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
283                let cwd = std::env::current_dir().unwrap();
284                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
285                    fmt::Display::fmt(&path, fmt)
286                };
287                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
288                for frame in self.0.frames() {
289                    let mut formatted_frame = fmt.frame();
290                    if frame
291                        .symbols()
292                        .iter()
293                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
294                    {
295                        formatted_frame.backtrace_frame(frame)?;
296                        break;
297                    }
298                }
299                fmt.finish()
300            }
301        }
302
303        for ((backtrace, scheduled), spawned_from_foreground) in self
304            .executed
305            .iter()
306            .zip(&self.scheduled)
307            .zip(&self.spawned_from_foreground)
308        {
309            writeln!(f, "Scheduled")?;
310            for backtrace in scheduled {
311                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
312            }
313            if scheduled.is_empty() {
314                writeln!(f, "None")?;
315            }
316            writeln!(f, "==========")?;
317
318            writeln!(f, "Spawned from foreground")?;
319            for backtrace in spawned_from_foreground {
320                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
321            }
322            if spawned_from_foreground.is_empty() {
323                writeln!(f, "None")?;
324            }
325            writeln!(f, "==========")?;
326
327            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
328            writeln!(f, "+++++++++++++++++++")?;
329        }
330
331        Ok(())
332    }
333}
334
335impl Drop for Trace {
336    fn drop(&mut self) {
337        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
338            trace_on_panic == "1" || trace_on_panic == "true"
339        } else {
340            false
341        };
342        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
343            trace_always == "1" || trace_always == "true"
344        } else {
345            false
346        };
347
348        if trace_always || (trace_on_panic && thread::panicking()) {
349            self.resolve();
350            dbg!(self);
351        }
352    }
353}
354
355impl Foreground {
356    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
357        if dispatcher.is_main_thread() {
358            Ok(Self::Platform {
359                dispatcher,
360                _not_send_or_sync: PhantomData,
361            })
362        } else {
363            Err(anyhow!("must be constructed on main thread"))
364        }
365    }
366
367    pub fn test() -> Self {
368        Self::Test(smol::LocalExecutor::new())
369    }
370
371    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
372        match self {
373            Self::Platform { dispatcher, .. } => {
374                let dispatcher = dispatcher.clone();
375                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
376                let (runnable, task) = async_task::spawn_local(future, schedule);
377                runnable.schedule();
378                task
379            }
380            Self::Test(executor) => executor.spawn(future),
381            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
382        }
383    }
384
385    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
386        match self {
387            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
388            Self::Test(executor) => smol::block_on(executor.run(future)),
389            Self::Deterministic(executor) => executor.run(future),
390        }
391    }
392
393    pub fn forbid_parking(&self) {
394        match self {
395            Self::Deterministic(executor) => {
396                let mut state = executor.state.lock();
397                state.forbid_parking = true;
398                state.rng = StdRng::seed_from_u64(state.seed);
399            }
400            _ => panic!("this method can only be called on a deterministic executor"),
401        }
402    }
403
404    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
405        match self {
406            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
407            _ => panic!("this method can only be called on a deterministic executor"),
408        }
409    }
410}
411
412impl Background {
413    pub fn new() -> Self {
414        let executor = Arc::new(Executor::new());
415        let stop = channel::unbounded::<()>();
416
417        for i in 0..2 * num_cpus::get() {
418            let executor = executor.clone();
419            let stop = stop.1.clone();
420            thread::Builder::new()
421                .name(format!("background-executor-{}", i))
422                .spawn(move || smol::block_on(executor.run(stop.recv())))
423                .unwrap();
424        }
425
426        Self::Production {
427            executor,
428            _stop: stop.0,
429        }
430    }
431
432    pub fn num_cpus(&self) -> usize {
433        num_cpus::get()
434    }
435
436    pub fn spawn<T, F>(&self, future: F) -> Task<T>
437    where
438        T: 'static + Send,
439        F: Send + Future<Output = T> + 'static,
440    {
441        match self {
442            Self::Production { executor, .. } => executor.spawn(future),
443            Self::Deterministic(executor) => executor.spawn(future),
444        }
445    }
446
447    pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
448    where
449        T: 'static,
450        F: 'static + Unpin + Future<Output = T>,
451    {
452        let output = match self {
453            Self::Production { .. } => {
454                smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
455            }
456            Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
457        };
458
459        if let Some(output) = output {
460            Ok(output)
461        } else {
462            Err(future)
463        }
464    }
465
466    pub async fn scoped<'scope, F>(&self, scheduler: F)
467    where
468        F: FnOnce(&mut Scope<'scope>),
469    {
470        let mut scope = Scope {
471            futures: Default::default(),
472            _phantom: PhantomData,
473        };
474        (scheduler)(&mut scope);
475        let spawned = scope
476            .futures
477            .into_iter()
478            .map(|f| self.spawn(f))
479            .collect::<Vec<_>>();
480        for task in spawned {
481            task.await;
482        }
483    }
484}
485
486pub struct Scope<'a> {
487    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
488    _phantom: PhantomData<&'a ()>,
489}
490
491impl<'a> Scope<'a> {
492    pub fn spawn<F>(&mut self, f: F)
493    where
494        F: Future<Output = ()> + Send + 'a,
495    {
496        let f = unsafe {
497            mem::transmute::<
498                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
499                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
500            >(Box::pin(f))
501        };
502        self.futures.push(f);
503    }
504}
505
506pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
507    let executor = Arc::new(Deterministic::new(seed));
508    (
509        Rc::new(Foreground::Deterministic(executor.clone())),
510        Arc::new(Background::Deterministic(executor)),
511    )
512}