executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    pin::Pin,
 13    rc::Rc,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        Arc,
 17    },
 18    task::{Context, Poll},
 19    thread,
 20    time::Duration,
 21};
 22use waker_fn::waker_fn;
 23
 24use crate::{platform, util};
 25
 26pub enum Foreground {
 27    Platform {
 28        dispatcher: Arc<dyn platform::Dispatcher>,
 29        _not_send_or_sync: PhantomData<Rc<()>>,
 30    },
 31    Test(smol::LocalExecutor<'static>),
 32    Deterministic(Arc<Deterministic>),
 33}
 34
 35pub enum Background {
 36    Deterministic(Arc<Deterministic>),
 37    Production {
 38        executor: Arc<smol::Executor<'static>>,
 39        _stop: channel::Sender<()>,
 40    },
 41}
 42
 43struct DeterministicState {
 44    rng: StdRng,
 45    seed: u64,
 46    scheduled: Vec<(Runnable, Backtrace)>,
 47    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 48    forbid_parking: bool,
 49}
 50
 51pub struct Deterministic {
 52    state: Arc<Mutex<DeterministicState>>,
 53    parker: Mutex<parking::Parker>,
 54}
 55
 56impl Deterministic {
 57    fn new(seed: u64) -> Self {
 58        Self {
 59            state: Arc::new(Mutex::new(DeterministicState {
 60                rng: StdRng::seed_from_u64(seed),
 61                seed,
 62                scheduled: Default::default(),
 63                spawned_from_foreground: Default::default(),
 64                forbid_parking: false,
 65            })),
 66            parker: Default::default(),
 67        }
 68    }
 69
 70    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 71    where
 72        T: 'static,
 73        F: Future<Output = T> + 'static,
 74    {
 75        let backtrace = Backtrace::new_unresolved();
 76        let scheduled_once = AtomicBool::new(false);
 77        let state = self.state.clone();
 78        let unparker = self.parker.lock().unparker();
 79        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 80            let mut state = state.lock();
 81            let backtrace = backtrace.clone();
 82            if scheduled_once.fetch_or(true, SeqCst) {
 83                state.scheduled.push((runnable, backtrace));
 84            } else {
 85                state.spawned_from_foreground.push((runnable, backtrace));
 86            }
 87            unparker.unpark();
 88        });
 89        runnable.schedule();
 90        task
 91    }
 92
 93    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 94    where
 95        T: 'static + Send,
 96        F: 'static + Send + Future<Output = T>,
 97    {
 98        let backtrace = Backtrace::new_unresolved();
 99        let state = self.state.clone();
100        let unparker = self.parker.lock().unparker();
101        let (runnable, task) = async_task::spawn(future, move |runnable| {
102            let mut state = state.lock();
103            state.scheduled.push((runnable, backtrace.clone()));
104            unparker.unpark();
105        });
106        runnable.schedule();
107        task
108    }
109
110    pub fn run<F, T>(&self, future: F) -> T
111    where
112        T: 'static,
113        F: Future<Output = T> + 'static,
114    {
115        self.block_on(usize::MAX, future).unwrap()
116    }
117
118    pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
119    where
120        T: 'static,
121        F: Future<Output = T>,
122    {
123        smol::pin!(future);
124
125        let unparker = self.parker.lock().unparker();
126        let waker = waker_fn(move || {
127            unparker.unpark();
128        });
129
130        let mut cx = Context::from_waker(&waker);
131        let mut trace = Trace::default();
132        for _ in 0..max_ticks {
133            let mut state = self.state.lock();
134            let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
135            let ix = state.rng.gen_range(0..=runnable_count);
136            if ix < state.scheduled.len() {
137                let (_, backtrace) = &state.scheduled[ix];
138                trace.record(&state, backtrace.clone());
139                let runnable = state.scheduled.remove(ix).0;
140                drop(state);
141                runnable.run();
142            } else if ix < runnable_count {
143                let (_, backtrace) = &state.spawned_from_foreground[0];
144                trace.record(&state, backtrace.clone());
145                let runnable = state.spawned_from_foreground.remove(0).0;
146                drop(state);
147                runnable.run();
148            } else {
149                drop(state);
150                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
151                    return Some(result);
152                }
153                let state = &mut *self.state.lock();
154                if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
155                    if state.forbid_parking {
156                        panic!("deterministic executor parked after a call to forbid_parking");
157                    }
158                    drop(state);
159                    self.parker.lock().park();
160                }
161
162                continue;
163            }
164        }
165
166        None
167    }
168}
169
170#[derive(Default)]
171struct Trace {
172    executed: Vec<Backtrace>,
173    scheduled: Vec<Vec<Backtrace>>,
174    spawned_from_foreground: Vec<Vec<Backtrace>>,
175}
176
177impl Trace {
178    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
179        self.scheduled.push(
180            state
181                .scheduled
182                .iter()
183                .map(|(_, backtrace)| backtrace.clone())
184                .collect(),
185        );
186        self.spawned_from_foreground.push(
187            state
188                .spawned_from_foreground
189                .iter()
190                .map(|(_, backtrace)| backtrace.clone())
191                .collect(),
192        );
193        self.executed.push(executed);
194    }
195
196    fn resolve(&mut self) {
197        for backtrace in &mut self.executed {
198            backtrace.resolve();
199        }
200
201        for backtraces in &mut self.scheduled {
202            for backtrace in backtraces {
203                backtrace.resolve();
204            }
205        }
206
207        for backtraces in &mut self.spawned_from_foreground {
208            for backtrace in backtraces {
209                backtrace.resolve();
210            }
211        }
212    }
213}
214
215impl Debug for Trace {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
218
219        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
220            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
221                let cwd = std::env::current_dir().unwrap();
222                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
223                    fmt::Display::fmt(&path, fmt)
224                };
225                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
226                for frame in self.0.frames() {
227                    let mut formatted_frame = fmt.frame();
228                    if frame
229                        .symbols()
230                        .iter()
231                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
232                    {
233                        formatted_frame.backtrace_frame(frame)?;
234                        break;
235                    }
236                }
237                fmt.finish()
238            }
239        }
240
241        for ((backtrace, scheduled), spawned_from_foreground) in self
242            .executed
243            .iter()
244            .zip(&self.scheduled)
245            .zip(&self.spawned_from_foreground)
246        {
247            writeln!(f, "Scheduled")?;
248            for backtrace in scheduled {
249                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
250            }
251            if scheduled.is_empty() {
252                writeln!(f, "None")?;
253            }
254            writeln!(f, "==========")?;
255
256            writeln!(f, "Spawned from foreground")?;
257            for backtrace in spawned_from_foreground {
258                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
259            }
260            if spawned_from_foreground.is_empty() {
261                writeln!(f, "None")?;
262            }
263            writeln!(f, "==========")?;
264
265            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
266            writeln!(f, "+++++++++++++++++++")?;
267        }
268
269        Ok(())
270    }
271}
272
273impl Drop for Trace {
274    fn drop(&mut self) {
275        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
276            trace_on_panic == "1" || trace_on_panic == "true"
277        } else {
278            false
279        };
280        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
281            trace_always == "1" || trace_always == "true"
282        } else {
283            false
284        };
285
286        if trace_always || (trace_on_panic && thread::panicking()) {
287            self.resolve();
288            dbg!(self);
289        }
290    }
291}
292
293impl Foreground {
294    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
295        if dispatcher.is_main_thread() {
296            Ok(Self::Platform {
297                dispatcher,
298                _not_send_or_sync: PhantomData,
299            })
300        } else {
301            Err(anyhow!("must be constructed on main thread"))
302        }
303    }
304
305    pub fn test() -> Self {
306        Self::Test(smol::LocalExecutor::new())
307    }
308
309    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
310        match self {
311            Self::Platform { dispatcher, .. } => {
312                let dispatcher = dispatcher.clone();
313                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
314                let (runnable, task) = async_task::spawn_local(future, schedule);
315                runnable.schedule();
316                task
317            }
318            Self::Test(executor) => executor.spawn(future),
319            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
320        }
321    }
322
323    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
324        match self {
325            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
326            Self::Test(executor) => smol::block_on(executor.run(future)),
327            Self::Deterministic(executor) => executor.run(future),
328        }
329    }
330
331    pub fn reset(&self) {
332        match self {
333            Self::Platform { .. } => panic!("can't call this method on a platform executor"),
334            Self::Test(_) => panic!("can't call this method on a test executor"),
335            Self::Deterministic(executor) => {
336                let state = &mut *executor.state.lock();
337                state.rng = StdRng::seed_from_u64(state.seed);
338            }
339        }
340    }
341
342    pub fn forbid_parking(&self) {
343        match self {
344            Self::Platform { .. } => panic!("can't call this method on a platform executor"),
345            Self::Test(_) => panic!("can't call this method on a test executor"),
346            Self::Deterministic(executor) => {
347                executor.state.lock().forbid_parking = true;
348            }
349        }
350    }
351}
352
353impl Background {
354    pub fn new() -> Self {
355        let executor = Arc::new(Executor::new());
356        let stop = channel::unbounded::<()>();
357
358        for i in 0..2 * num_cpus::get() {
359            let executor = executor.clone();
360            let stop = stop.1.clone();
361            thread::Builder::new()
362                .name(format!("background-executor-{}", i))
363                .spawn(move || smol::block_on(executor.run(stop.recv())))
364                .unwrap();
365        }
366
367        Self::Production {
368            executor,
369            _stop: stop.0,
370        }
371    }
372
373    pub fn num_cpus(&self) -> usize {
374        num_cpus::get()
375    }
376
377    pub fn spawn<T, F>(&self, future: F) -> Task<T>
378    where
379        T: 'static + Send,
380        F: Send + Future<Output = T> + 'static,
381    {
382        match self {
383            Self::Production { executor, .. } => executor.spawn(future),
384            Self::Deterministic(executor) => executor.spawn(future),
385        }
386    }
387
388    pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
389    where
390        T: 'static,
391        F: Future<Output = T>,
392    {
393        match self {
394            Self::Production { .. } => {
395                smol::block_on(async move { util::timeout(timeout, future).await.ok() })
396            }
397            Self::Deterministic(executor) => {
398                let max_ticks = executor.state.lock().rng.gen_range(1..=1000);
399                executor.block_on(max_ticks, future)
400            }
401        }
402    }
403
404    pub async fn scoped<'scope, F>(&self, scheduler: F)
405    where
406        F: FnOnce(&mut Scope<'scope>),
407    {
408        let mut scope = Scope {
409            futures: Default::default(),
410            _phantom: PhantomData,
411        };
412        (scheduler)(&mut scope);
413        let spawned = scope
414            .futures
415            .into_iter()
416            .map(|f| self.spawn(f))
417            .collect::<Vec<_>>();
418        for task in spawned {
419            task.await;
420        }
421    }
422}
423
424pub struct Scope<'a> {
425    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
426    _phantom: PhantomData<&'a ()>,
427}
428
429impl<'a> Scope<'a> {
430    pub fn spawn<F>(&mut self, f: F)
431    where
432        F: Future<Output = ()> + Send + 'a,
433    {
434        let f = unsafe {
435            mem::transmute::<
436                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
437                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
438            >(Box::pin(f))
439        };
440        self.futures.push(f);
441    }
442}
443
444pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
445    let executor = Arc::new(Deterministic::new(seed));
446    (
447        Rc::new(Foreground::Deterministic(executor.clone())),
448        Arc::new(Background::Deterministic(executor)),
449    )
450}