executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    ops::RangeInclusive,
 13    pin::Pin,
 14    rc::Rc,
 15    sync::{
 16        atomic::{AtomicBool, Ordering::SeqCst},
 17        Arc,
 18    },
 19    task::{Context, Poll},
 20    thread,
 21    time::Duration,
 22};
 23use waker_fn::waker_fn;
 24
 25use crate::{platform, util};
 26
 27pub enum Foreground {
 28    Platform {
 29        dispatcher: Arc<dyn platform::Dispatcher>,
 30        _not_send_or_sync: PhantomData<Rc<()>>,
 31    },
 32    Test(smol::LocalExecutor<'static>),
 33    Deterministic(Arc<Deterministic>),
 34}
 35
 36pub enum Background {
 37    Deterministic(Arc<Deterministic>),
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44struct DeterministicState {
 45    rng: StdRng,
 46    seed: u64,
 47    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 48    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 49    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 50    forbid_parking: bool,
 51    block_on_ticks: RangeInclusive<usize>,
 52}
 53
 54pub struct Deterministic {
 55    state: Arc<Mutex<DeterministicState>>,
 56    parker: Mutex<parking::Parker>,
 57}
 58
 59impl Deterministic {
 60    fn new(seed: u64) -> Self {
 61        Self {
 62            state: Arc::new(Mutex::new(DeterministicState {
 63                rng: StdRng::seed_from_u64(seed),
 64                seed,
 65                scheduled_from_foreground: Default::default(),
 66                scheduled_from_background: Default::default(),
 67                spawned_from_foreground: Default::default(),
 68                forbid_parking: false,
 69                block_on_ticks: 0..=1000,
 70            })),
 71            parker: Default::default(),
 72        }
 73    }
 74
 75    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 76    where
 77        T: 'static,
 78        F: Future<Output = T> + 'static,
 79    {
 80        let backtrace = Backtrace::new_unresolved();
 81        let scheduled_once = AtomicBool::new(false);
 82        let state = self.state.clone();
 83        let unparker = self.parker.lock().unparker();
 84        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 85            let mut state = state.lock();
 86            let backtrace = backtrace.clone();
 87            if scheduled_once.fetch_or(true, SeqCst) {
 88                state.scheduled_from_foreground.push((runnable, backtrace));
 89            } else {
 90                state.spawned_from_foreground.push((runnable, backtrace));
 91            }
 92            unparker.unpark();
 93        });
 94        runnable.schedule();
 95        task
 96    }
 97
 98    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 99    where
100        T: 'static + Send,
101        F: 'static + Send + Future<Output = T>,
102    {
103        let backtrace = Backtrace::new_unresolved();
104        let state = self.state.clone();
105        let unparker = self.parker.lock().unparker();
106        let (runnable, task) = async_task::spawn(future, move |runnable| {
107            let mut state = state.lock();
108            state
109                .scheduled_from_background
110                .push((runnable, backtrace.clone()));
111            unparker.unpark();
112        });
113        runnable.schedule();
114        task
115    }
116
117    pub fn run<F, T>(&self, future: F) -> T
118    where
119        T: 'static,
120        F: Future<Output = T> + 'static,
121    {
122        smol::pin!(future);
123
124        let unparker = self.parker.lock().unparker();
125        let woken = Arc::new(AtomicBool::new(false));
126        let waker = {
127            let woken = woken.clone();
128            waker_fn(move || {
129                woken.store(true, SeqCst);
130                unparker.unpark();
131            })
132        };
133
134        let mut cx = Context::from_waker(&waker);
135        let mut trace = Trace::default();
136        loop {
137            let mut state = self.state.lock();
138            let runnable_count = state.scheduled_from_foreground.len()
139                + state.scheduled_from_background.len()
140                + state.spawned_from_foreground.len();
141
142            let ix = state.rng.gen_range(0..=runnable_count);
143            if ix < state.scheduled_from_foreground.len() {
144                let (_, backtrace) = &state.scheduled_from_foreground[ix];
145                trace.record(&state, backtrace.clone());
146                let runnable = state.scheduled_from_foreground.remove(ix).0;
147                drop(state);
148                runnable.run();
149            } else if ix - state.scheduled_from_foreground.len()
150                < state.scheduled_from_background.len()
151            {
152                let ix = ix - state.scheduled_from_foreground.len();
153                let (_, backtrace) = &state.scheduled_from_background[ix];
154                trace.record(&state, backtrace.clone());
155                let runnable = state.scheduled_from_background.remove(ix).0;
156                drop(state);
157                runnable.run();
158            } else if ix < runnable_count {
159                let (_, backtrace) = &state.spawned_from_foreground[0];
160                trace.record(&state, backtrace.clone());
161                let runnable = state.spawned_from_foreground.remove(0).0;
162                drop(state);
163                runnable.run();
164            } else {
165                drop(state);
166                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
167                    return result;
168                }
169                let state = self.state.lock();
170                if state.scheduled_from_foreground.is_empty()
171                    && state.scheduled_from_background.is_empty()
172                    && state.spawned_from_foreground.is_empty()
173                {
174                    if state.forbid_parking && !woken.load(SeqCst) {
175                        panic!("deterministic executor parked after a call to forbid_parking");
176                    }
177                    drop(state);
178                    woken.store(false, SeqCst);
179                    self.parker.lock().park();
180                }
181
182                continue;
183            }
184        }
185    }
186
187    pub fn block_on<F, T>(&self, future: F) -> Option<T>
188    where
189        T: 'static,
190        F: Future<Output = T>,
191    {
192        smol::pin!(future);
193
194        let unparker = self.parker.lock().unparker();
195        let waker = waker_fn(move || {
196            unparker.unpark();
197        });
198        let max_ticks = {
199            let mut state = self.state.lock();
200            let range = state.block_on_ticks.clone();
201            state.rng.gen_range(range)
202        };
203
204        let mut cx = Context::from_waker(&waker);
205        let mut trace = Trace::default();
206        for _ in 0..max_ticks {
207            let mut state = self.state.lock();
208            let runnable_count = state.scheduled_from_background.len();
209            let ix = state.rng.gen_range(0..=runnable_count);
210            if ix < state.scheduled_from_background.len() {
211                let (_, backtrace) = &state.scheduled_from_background[ix];
212                trace.record(&state, backtrace.clone());
213                let runnable = state.scheduled_from_background.remove(ix).0;
214                drop(state);
215                runnable.run();
216            } else {
217                drop(state);
218                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
219                    return Some(result);
220                }
221                let state = self.state.lock();
222                if state.scheduled_from_background.is_empty() {
223                    if state.forbid_parking {
224                        panic!("deterministic executor parked after a call to forbid_parking");
225                    }
226                    drop(state);
227                    self.parker.lock().park();
228                }
229
230                continue;
231            }
232        }
233
234        None
235    }
236}
237
238#[derive(Default)]
239struct Trace {
240    executed: Vec<Backtrace>,
241    scheduled: Vec<Vec<Backtrace>>,
242    spawned_from_foreground: Vec<Vec<Backtrace>>,
243}
244
245impl Trace {
246    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
247        self.scheduled.push(
248            state
249                .scheduled_from_foreground
250                .iter()
251                .map(|(_, backtrace)| backtrace.clone())
252                .collect(),
253        );
254        self.spawned_from_foreground.push(
255            state
256                .spawned_from_foreground
257                .iter()
258                .map(|(_, backtrace)| backtrace.clone())
259                .collect(),
260        );
261        self.executed.push(executed);
262    }
263
264    fn resolve(&mut self) {
265        for backtrace in &mut self.executed {
266            backtrace.resolve();
267        }
268
269        for backtraces in &mut self.scheduled {
270            for backtrace in backtraces {
271                backtrace.resolve();
272            }
273        }
274
275        for backtraces in &mut self.spawned_from_foreground {
276            for backtrace in backtraces {
277                backtrace.resolve();
278            }
279        }
280    }
281}
282
283impl Debug for Trace {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
286
287        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
288            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
289                let cwd = std::env::current_dir().unwrap();
290                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
291                    fmt::Display::fmt(&path, fmt)
292                };
293                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
294                for frame in self.0.frames() {
295                    let mut formatted_frame = fmt.frame();
296                    if frame
297                        .symbols()
298                        .iter()
299                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
300                    {
301                        formatted_frame.backtrace_frame(frame)?;
302                        break;
303                    }
304                }
305                fmt.finish()
306            }
307        }
308
309        for ((backtrace, scheduled), spawned_from_foreground) in self
310            .executed
311            .iter()
312            .zip(&self.scheduled)
313            .zip(&self.spawned_from_foreground)
314        {
315            writeln!(f, "Scheduled")?;
316            for backtrace in scheduled {
317                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
318            }
319            if scheduled.is_empty() {
320                writeln!(f, "None")?;
321            }
322            writeln!(f, "==========")?;
323
324            writeln!(f, "Spawned from foreground")?;
325            for backtrace in spawned_from_foreground {
326                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
327            }
328            if spawned_from_foreground.is_empty() {
329                writeln!(f, "None")?;
330            }
331            writeln!(f, "==========")?;
332
333            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
334            writeln!(f, "+++++++++++++++++++")?;
335        }
336
337        Ok(())
338    }
339}
340
341impl Drop for Trace {
342    fn drop(&mut self) {
343        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
344            trace_on_panic == "1" || trace_on_panic == "true"
345        } else {
346            false
347        };
348        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
349            trace_always == "1" || trace_always == "true"
350        } else {
351            false
352        };
353
354        if trace_always || (trace_on_panic && thread::panicking()) {
355            self.resolve();
356            dbg!(self);
357        }
358    }
359}
360
361impl Foreground {
362    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
363        if dispatcher.is_main_thread() {
364            Ok(Self::Platform {
365                dispatcher,
366                _not_send_or_sync: PhantomData,
367            })
368        } else {
369            Err(anyhow!("must be constructed on main thread"))
370        }
371    }
372
373    pub fn test() -> Self {
374        Self::Test(smol::LocalExecutor::new())
375    }
376
377    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
378        match self {
379            Self::Platform { dispatcher, .. } => {
380                let dispatcher = dispatcher.clone();
381                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
382                let (runnable, task) = async_task::spawn_local(future, schedule);
383                runnable.schedule();
384                task
385            }
386            Self::Test(executor) => executor.spawn(future),
387            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
388        }
389    }
390
391    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
392        match self {
393            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
394            Self::Test(executor) => smol::block_on(executor.run(future)),
395            Self::Deterministic(executor) => executor.run(future),
396        }
397    }
398
399    pub fn forbid_parking(&self) {
400        match self {
401            Self::Deterministic(executor) => {
402                let mut state = executor.state.lock();
403                state.forbid_parking = true;
404                state.rng = StdRng::seed_from_u64(state.seed);
405            }
406            _ => panic!("this method can only be called on a deterministic executor"),
407        }
408    }
409
410    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
411        match self {
412            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
413            _ => panic!("this method can only be called on a deterministic executor"),
414        }
415    }
416}
417
418impl Background {
419    pub fn new() -> Self {
420        let executor = Arc::new(Executor::new());
421        let stop = channel::unbounded::<()>();
422
423        for i in 0..2 * num_cpus::get() {
424            let executor = executor.clone();
425            let stop = stop.1.clone();
426            thread::Builder::new()
427                .name(format!("background-executor-{}", i))
428                .spawn(move || smol::block_on(executor.run(stop.recv())))
429                .unwrap();
430        }
431
432        Self::Production {
433            executor,
434            _stop: stop.0,
435        }
436    }
437
438    pub fn num_cpus(&self) -> usize {
439        num_cpus::get()
440    }
441
442    pub fn spawn<T, F>(&self, future: F) -> Task<T>
443    where
444        T: 'static + Send,
445        F: Send + Future<Output = T> + 'static,
446    {
447        match self {
448            Self::Production { executor, .. } => executor.spawn(future),
449            Self::Deterministic(executor) => executor.spawn(future),
450        }
451    }
452
453    pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
454    where
455        T: 'static,
456        F: 'static + Unpin + Future<Output = T>,
457    {
458        if !timeout.is_zero() {
459            let output = match self {
460                Self::Production { .. } => {
461                    smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
462                }
463                Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
464            };
465            if let Some(output) = output {
466                return Ok(output);
467            }
468        }
469        Err(future)
470    }
471
472    pub async fn scoped<'scope, F>(&self, scheduler: F)
473    where
474        F: FnOnce(&mut Scope<'scope>),
475    {
476        let mut scope = Scope {
477            futures: Default::default(),
478            _phantom: PhantomData,
479        };
480        (scheduler)(&mut scope);
481        let spawned = scope
482            .futures
483            .into_iter()
484            .map(|f| self.spawn(f))
485            .collect::<Vec<_>>();
486        for task in spawned {
487            task.await;
488        }
489    }
490}
491
492pub struct Scope<'a> {
493    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
494    _phantom: PhantomData<&'a ()>,
495}
496
497impl<'a> Scope<'a> {
498    pub fn spawn<F>(&mut self, f: F)
499    where
500        F: Future<Output = ()> + Send + 'a,
501    {
502        let f = unsafe {
503            mem::transmute::<
504                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
505                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
506            >(Box::pin(f))
507        };
508        self.futures.push(f);
509    }
510}
511
512pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
513    let executor = Arc::new(Deterministic::new(seed));
514    (
515        Rc::new(Foreground::Deterministic(executor.clone())),
516        Arc::new(Background::Deterministic(executor)),
517    )
518}