executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use futures::task::noop_waker;
  6use parking_lot::Mutex;
  7use rand::prelude::*;
  8use smol::{channel, prelude::*, Executor};
  9use std::{
 10    fmt::{self, Debug},
 11    marker::PhantomData,
 12    mem,
 13    pin::Pin,
 14    rc::Rc,
 15    sync::{
 16        atomic::{AtomicBool, Ordering::SeqCst},
 17        Arc,
 18    },
 19    task::{Context, Poll},
 20    thread,
 21    time::Duration,
 22};
 23
 24use crate::{platform, util};
 25
 26pub enum Foreground {
 27    Platform {
 28        dispatcher: Arc<dyn platform::Dispatcher>,
 29        _not_send_or_sync: PhantomData<Rc<()>>,
 30    },
 31    Test(smol::LocalExecutor<'static>),
 32    Deterministic(Arc<Deterministic>),
 33}
 34
 35pub enum Background {
 36    Deterministic(Arc<Deterministic>),
 37    Production {
 38        executor: Arc<smol::Executor<'static>>,
 39        _stop: channel::Sender<()>,
 40    },
 41}
 42
 43struct DeterministicState {
 44    rng: StdRng,
 45    seed: u64,
 46    scheduled: Vec<(Runnable, Backtrace)>,
 47    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 48}
 49
 50pub struct Deterministic(Arc<Mutex<DeterministicState>>);
 51
 52impl Deterministic {
 53    fn new(seed: u64) -> Self {
 54        Self(Arc::new(Mutex::new(DeterministicState {
 55            rng: StdRng::seed_from_u64(seed),
 56            seed,
 57            scheduled: Default::default(),
 58            spawned_from_foreground: Default::default(),
 59        })))
 60    }
 61
 62    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 63    where
 64        T: 'static,
 65        F: Future<Output = T> + 'static,
 66    {
 67        let backtrace = Backtrace::new_unresolved();
 68        let scheduled_once = AtomicBool::new(false);
 69        let state = self.0.clone();
 70        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 71            let mut state = state.lock();
 72            let backtrace = backtrace.clone();
 73            if scheduled_once.fetch_or(true, SeqCst) {
 74                state.scheduled.push((runnable, backtrace));
 75            } else {
 76                state.spawned_from_foreground.push((runnable, backtrace));
 77            }
 78        });
 79        runnable.schedule();
 80        task
 81    }
 82
 83    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 84    where
 85        T: 'static + Send,
 86        F: 'static + Send + Future<Output = T>,
 87    {
 88        let backtrace = Backtrace::new_unresolved();
 89        let state = self.0.clone();
 90        let (runnable, task) = async_task::spawn(future, move |runnable| {
 91            state.lock().scheduled.push((runnable, backtrace.clone()));
 92        });
 93        runnable.schedule();
 94        task
 95    }
 96
 97    pub fn run<F, T>(&self, future: F) -> T
 98    where
 99        T: 'static,
100        F: Future<Output = T> + 'static,
101    {
102        self.block_on(usize::MAX, future).unwrap()
103    }
104
105    pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
106    where
107        T: 'static,
108        F: Future<Output = T>,
109    {
110        smol::pin!(future);
111
112        let waker = noop_waker();
113        let mut cx = Context::from_waker(&waker);
114        let mut trace = Trace::default();
115        for _ in 0..max_ticks {
116            let runnable = {
117                let state = &mut *self.0.lock();
118                let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
119                let ix = state.rng.gen_range(0..=runnable_count);
120                if ix < state.scheduled.len() {
121                    let (_, backtrace) = &state.scheduled[ix];
122                    trace.record(&state, backtrace.clone());
123                    state.scheduled.remove(ix).0
124                } else if ix < runnable_count {
125                    let (_, backtrace) = &state.spawned_from_foreground[0];
126                    trace.record(&state, backtrace.clone());
127                    state.spawned_from_foreground.remove(0).0
128                } else {
129                    if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
130                        return Some(result);
131                    }
132
133                    if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
134                        panic!("detected non-determinism in deterministic executor");
135                    } else {
136                        continue;
137                    }
138                }
139            };
140
141            runnable.run();
142        }
143
144        None
145    }
146}
147
148#[derive(Default)]
149struct Trace {
150    executed: Vec<Backtrace>,
151    scheduled: Vec<Vec<Backtrace>>,
152    spawned_from_foreground: Vec<Vec<Backtrace>>,
153}
154
155impl Trace {
156    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
157        self.scheduled.push(
158            state
159                .scheduled
160                .iter()
161                .map(|(_, backtrace)| backtrace.clone())
162                .collect(),
163        );
164        self.spawned_from_foreground.push(
165            state
166                .spawned_from_foreground
167                .iter()
168                .map(|(_, backtrace)| backtrace.clone())
169                .collect(),
170        );
171        self.executed.push(executed);
172    }
173
174    fn resolve(&mut self) {
175        for backtrace in &mut self.executed {
176            backtrace.resolve();
177        }
178
179        for backtraces in &mut self.scheduled {
180            for backtrace in backtraces {
181                backtrace.resolve();
182            }
183        }
184
185        for backtraces in &mut self.spawned_from_foreground {
186            for backtrace in backtraces {
187                backtrace.resolve();
188            }
189        }
190    }
191}
192
193impl Debug for Trace {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
196
197        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
198            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
199                let cwd = std::env::current_dir().unwrap();
200                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
201                    fmt::Display::fmt(&path, fmt)
202                };
203                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
204                for frame in self.0.frames() {
205                    let mut formatted_frame = fmt.frame();
206                    if frame
207                        .symbols()
208                        .iter()
209                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
210                    {
211                        formatted_frame.backtrace_frame(frame)?;
212                        break;
213                    }
214                }
215                fmt.finish()
216            }
217        }
218
219        for ((backtrace, scheduled), spawned_from_foreground) in self
220            .executed
221            .iter()
222            .zip(&self.scheduled)
223            .zip(&self.spawned_from_foreground)
224        {
225            writeln!(f, "Scheduled")?;
226            for backtrace in scheduled {
227                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
228            }
229            if scheduled.is_empty() {
230                writeln!(f, "None")?;
231            }
232            writeln!(f, "==========")?;
233
234            writeln!(f, "Spawned from foreground")?;
235            for backtrace in spawned_from_foreground {
236                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
237            }
238            if spawned_from_foreground.is_empty() {
239                writeln!(f, "None")?;
240            }
241            writeln!(f, "==========")?;
242
243            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
244            writeln!(f, "+++++++++++++++++++")?;
245        }
246
247        Ok(())
248    }
249}
250
251impl Drop for Trace {
252    fn drop(&mut self) {
253        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
254            trace_on_panic == "1" || trace_on_panic == "true"
255        } else {
256            false
257        };
258        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
259            trace_always == "1" || trace_always == "true"
260        } else {
261            false
262        };
263
264        if trace_always || (trace_on_panic && thread::panicking()) {
265            self.resolve();
266            dbg!(self);
267        }
268    }
269}
270
271impl Foreground {
272    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
273        if dispatcher.is_main_thread() {
274            Ok(Self::Platform {
275                dispatcher,
276                _not_send_or_sync: PhantomData,
277            })
278        } else {
279            Err(anyhow!("must be constructed on main thread"))
280        }
281    }
282
283    pub fn test() -> Self {
284        Self::Test(smol::LocalExecutor::new())
285    }
286
287    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
288        match self {
289            Self::Platform { dispatcher, .. } => {
290                let dispatcher = dispatcher.clone();
291                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
292                let (runnable, task) = async_task::spawn_local(future, schedule);
293                runnable.schedule();
294                task
295            }
296            Self::Test(executor) => executor.spawn(future),
297            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
298        }
299    }
300
301    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
302        match self {
303            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
304            Self::Test(executor) => smol::block_on(executor.run(future)),
305            Self::Deterministic(executor) => executor.run(future),
306        }
307    }
308
309    pub fn reset(&self) {
310        match self {
311            Self::Platform { .. } => panic!("can't call this method on a platform executor"),
312            Self::Test(_) => panic!("can't call this method on a test executor"),
313            Self::Deterministic(executor) => {
314                let state = &mut *executor.0.lock();
315                state.rng = StdRng::seed_from_u64(state.seed);
316            }
317        }
318    }
319}
320
321impl Background {
322    pub fn new() -> Self {
323        let executor = Arc::new(Executor::new());
324        let stop = channel::unbounded::<()>();
325
326        for i in 0..2 * num_cpus::get() {
327            let executor = executor.clone();
328            let stop = stop.1.clone();
329            thread::Builder::new()
330                .name(format!("background-executor-{}", i))
331                .spawn(move || smol::block_on(executor.run(stop.recv())))
332                .unwrap();
333        }
334
335        Self::Production {
336            executor,
337            _stop: stop.0,
338        }
339    }
340
341    pub fn num_cpus(&self) -> usize {
342        num_cpus::get()
343    }
344
345    pub fn spawn<T, F>(&self, future: F) -> Task<T>
346    where
347        T: 'static + Send,
348        F: Send + Future<Output = T> + 'static,
349    {
350        match self {
351            Self::Production { executor, .. } => executor.spawn(future),
352            Self::Deterministic(executor) => executor.spawn(future),
353        }
354    }
355
356    pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
357    where
358        T: 'static,
359        F: Future<Output = T>,
360    {
361        match self {
362            Self::Production { .. } => {
363                smol::block_on(async move { util::timeout(timeout, future).await.ok() })
364            }
365            Self::Deterministic(executor) => {
366                let max_ticks = executor.0.lock().rng.gen_range(1..=1000);
367                executor.block_on(max_ticks, future)
368            }
369        }
370    }
371
372    pub async fn scoped<'scope, F>(&self, scheduler: F)
373    where
374        F: FnOnce(&mut Scope<'scope>),
375    {
376        let mut scope = Scope {
377            futures: Default::default(),
378            _phantom: PhantomData,
379        };
380        (scheduler)(&mut scope);
381        let spawned = scope
382            .futures
383            .into_iter()
384            .map(|f| self.spawn(f))
385            .collect::<Vec<_>>();
386        for task in spawned {
387            task.await;
388        }
389    }
390}
391
392pub struct Scope<'a> {
393    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
394    _phantom: PhantomData<&'a ()>,
395}
396
397impl<'a> Scope<'a> {
398    pub fn spawn<F>(&mut self, f: F)
399    where
400        F: Future<Output = ()> + Send + 'a,
401    {
402        let f = unsafe {
403            mem::transmute::<
404                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
405                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
406            >(Box::pin(f))
407        };
408        self.futures.push(f);
409    }
410}
411
412pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
413    let executor = Arc::new(Deterministic::new(seed));
414    (
415        Rc::new(Foreground::Deterministic(executor.clone())),
416        Arc::new(Background::Deterministic(executor)),
417    )
418}