executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    pin::Pin,
 13    rc::Rc,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        Arc,
 17    },
 18    task::{Context, Poll},
 19    thread,
 20    time::Duration,
 21};
 22use waker_fn::waker_fn;
 23
 24use crate::{platform, util};
 25
 26pub enum Foreground {
 27    Platform {
 28        dispatcher: Arc<dyn platform::Dispatcher>,
 29        _not_send_or_sync: PhantomData<Rc<()>>,
 30    },
 31    Test(smol::LocalExecutor<'static>),
 32    Deterministic(Arc<Deterministic>),
 33}
 34
 35pub enum Background {
 36    Deterministic(Arc<Deterministic>),
 37    Production {
 38        executor: Arc<smol::Executor<'static>>,
 39        _stop: channel::Sender<()>,
 40    },
 41}
 42
 43struct DeterministicState {
 44    rng: StdRng,
 45    seed: u64,
 46    scheduled: Vec<(Runnable, Backtrace)>,
 47    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 48    forbid_parking: bool,
 49}
 50
 51pub struct Deterministic {
 52    state: Arc<Mutex<DeterministicState>>,
 53    parker: Mutex<parking::Parker>,
 54}
 55
 56impl Deterministic {
 57    fn new(seed: u64) -> Self {
 58        Self {
 59            state: Arc::new(Mutex::new(DeterministicState {
 60                rng: StdRng::seed_from_u64(seed),
 61                seed,
 62                scheduled: Default::default(),
 63                spawned_from_foreground: Default::default(),
 64                forbid_parking: false,
 65            })),
 66            parker: Default::default(),
 67        }
 68    }
 69
 70    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 71    where
 72        T: 'static,
 73        F: Future<Output = T> + 'static,
 74    {
 75        let backtrace = Backtrace::new_unresolved();
 76        let scheduled_once = AtomicBool::new(false);
 77        let state = self.state.clone();
 78        let unparker = self.parker.lock().unparker();
 79        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 80            let mut state = state.lock();
 81            let backtrace = backtrace.clone();
 82            if scheduled_once.fetch_or(true, SeqCst) {
 83                state.scheduled.push((runnable, backtrace));
 84            } else {
 85                state.spawned_from_foreground.push((runnable, backtrace));
 86            }
 87            unparker.unpark();
 88        });
 89        runnable.schedule();
 90        task
 91    }
 92
 93    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 94    where
 95        T: 'static + Send,
 96        F: 'static + Send + Future<Output = T>,
 97    {
 98        let backtrace = Backtrace::new_unresolved();
 99        let state = self.state.clone();
100        let unparker = self.parker.lock().unparker();
101        let (runnable, task) = async_task::spawn(future, move |runnable| {
102            let mut state = state.lock();
103            state.scheduled.push((runnable, backtrace.clone()));
104            unparker.unpark();
105        });
106        runnable.schedule();
107        task
108    }
109
110    pub fn run<F, T>(&self, future: F) -> T
111    where
112        T: 'static,
113        F: Future<Output = T> + 'static,
114    {
115        self.block_on(usize::MAX, future).unwrap()
116    }
117
118    pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
119    where
120        T: 'static,
121        F: Future<Output = T>,
122    {
123        smol::pin!(future);
124
125        let unparker = self.parker.lock().unparker();
126        let waker = waker_fn(move || {
127            unparker.unpark();
128        });
129
130        let mut cx = Context::from_waker(&waker);
131        let mut trace = Trace::default();
132        for _ in 0..max_ticks {
133            let mut state = self.state.lock();
134            let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
135            let ix = state.rng.gen_range(0..=runnable_count);
136            if ix < state.scheduled.len() {
137                let (_, backtrace) = &state.scheduled[ix];
138                trace.record(&state, backtrace.clone());
139                let runnable = state.scheduled.remove(ix).0;
140                drop(state);
141                runnable.run();
142            } else if ix < runnable_count {
143                let (_, backtrace) = &state.spawned_from_foreground[0];
144                trace.record(&state, backtrace.clone());
145                let runnable = state.spawned_from_foreground.remove(0).0;
146                drop(state);
147                runnable.run();
148            } else {
149                drop(state);
150                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
151                    return Some(result);
152                }
153                let state = &mut *self.state.lock();
154                if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
155                    if state.forbid_parking {
156                        panic!("deterministic executor parked after a call to forbid_parking");
157                    }
158                    drop(state);
159                    self.parker.lock().park();
160                }
161
162                continue;
163            }
164        }
165
166        None
167    }
168}
169
170#[derive(Default)]
171struct Trace {
172    executed: Vec<Backtrace>,
173    scheduled: Vec<Vec<Backtrace>>,
174    spawned_from_foreground: Vec<Vec<Backtrace>>,
175}
176
177impl Trace {
178    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
179        self.scheduled.push(
180            state
181                .scheduled
182                .iter()
183                .map(|(_, backtrace)| backtrace.clone())
184                .collect(),
185        );
186        self.spawned_from_foreground.push(
187            state
188                .spawned_from_foreground
189                .iter()
190                .map(|(_, backtrace)| backtrace.clone())
191                .collect(),
192        );
193        self.executed.push(executed);
194    }
195
196    fn resolve(&mut self) {
197        for backtrace in &mut self.executed {
198            backtrace.resolve();
199        }
200
201        for backtraces in &mut self.scheduled {
202            for backtrace in backtraces {
203                backtrace.resolve();
204            }
205        }
206
207        for backtraces in &mut self.spawned_from_foreground {
208            for backtrace in backtraces {
209                backtrace.resolve();
210            }
211        }
212    }
213}
214
215impl Debug for Trace {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
218
219        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
220            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
221                let cwd = std::env::current_dir().unwrap();
222                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
223                    fmt::Display::fmt(&path, fmt)
224                };
225                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
226                for frame in self.0.frames() {
227                    let mut formatted_frame = fmt.frame();
228                    if frame
229                        .symbols()
230                        .iter()
231                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
232                    {
233                        formatted_frame.backtrace_frame(frame)?;
234                        break;
235                    }
236                }
237                fmt.finish()
238            }
239        }
240
241        for ((backtrace, scheduled), spawned_from_foreground) in self
242            .executed
243            .iter()
244            .zip(&self.scheduled)
245            .zip(&self.spawned_from_foreground)
246        {
247            writeln!(f, "Scheduled")?;
248            for backtrace in scheduled {
249                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
250            }
251            if scheduled.is_empty() {
252                writeln!(f, "None")?;
253            }
254            writeln!(f, "==========")?;
255
256            writeln!(f, "Spawned from foreground")?;
257            for backtrace in spawned_from_foreground {
258                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
259            }
260            if spawned_from_foreground.is_empty() {
261                writeln!(f, "None")?;
262            }
263            writeln!(f, "==========")?;
264
265            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
266            writeln!(f, "+++++++++++++++++++")?;
267        }
268
269        Ok(())
270    }
271}
272
273impl Drop for Trace {
274    fn drop(&mut self) {
275        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
276            trace_on_panic == "1" || trace_on_panic == "true"
277        } else {
278            false
279        };
280        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
281            trace_always == "1" || trace_always == "true"
282        } else {
283            false
284        };
285
286        if trace_always || (trace_on_panic && thread::panicking()) {
287            self.resolve();
288            dbg!(self);
289        }
290    }
291}
292
293impl Foreground {
294    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
295        if dispatcher.is_main_thread() {
296            Ok(Self::Platform {
297                dispatcher,
298                _not_send_or_sync: PhantomData,
299            })
300        } else {
301            Err(anyhow!("must be constructed on main thread"))
302        }
303    }
304
305    pub fn test() -> Self {
306        Self::Test(smol::LocalExecutor::new())
307    }
308
309    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
310        match self {
311            Self::Platform { dispatcher, .. } => {
312                let dispatcher = dispatcher.clone();
313                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
314                let (runnable, task) = async_task::spawn_local(future, schedule);
315                runnable.schedule();
316                task
317            }
318            Self::Test(executor) => executor.spawn(future),
319            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
320        }
321    }
322
323    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
324        match self {
325            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
326            Self::Test(executor) => smol::block_on(executor.run(future)),
327            Self::Deterministic(executor) => executor.run(future),
328        }
329    }
330
331    pub fn forbid_parking(&self) {
332        match self {
333            Self::Platform { .. } => panic!("can't call this method on a platform executor"),
334            Self::Test(_) => panic!("can't call this method on a test executor"),
335            Self::Deterministic(executor) => {
336                let mut state = executor.state.lock();
337                state.forbid_parking = true;
338                state.rng = StdRng::seed_from_u64(state.seed);
339            }
340        }
341    }
342}
343
344impl Background {
345    pub fn new() -> Self {
346        let executor = Arc::new(Executor::new());
347        let stop = channel::unbounded::<()>();
348
349        for i in 0..2 * num_cpus::get() {
350            let executor = executor.clone();
351            let stop = stop.1.clone();
352            thread::Builder::new()
353                .name(format!("background-executor-{}", i))
354                .spawn(move || smol::block_on(executor.run(stop.recv())))
355                .unwrap();
356        }
357
358        Self::Production {
359            executor,
360            _stop: stop.0,
361        }
362    }
363
364    pub fn num_cpus(&self) -> usize {
365        num_cpus::get()
366    }
367
368    pub fn spawn<T, F>(&self, future: F) -> Task<T>
369    where
370        T: 'static + Send,
371        F: Send + Future<Output = T> + 'static,
372    {
373        match self {
374            Self::Production { executor, .. } => executor.spawn(future),
375            Self::Deterministic(executor) => executor.spawn(future),
376        }
377    }
378
379    pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
380    where
381        T: 'static,
382        F: Future<Output = T>,
383    {
384        match self {
385            Self::Production { .. } => {
386                smol::block_on(async move { util::timeout(timeout, future).await.ok() })
387            }
388            Self::Deterministic(executor) => {
389                let max_ticks = executor.state.lock().rng.gen_range(1..=1000);
390                executor.block_on(max_ticks, future)
391            }
392        }
393    }
394
395    pub async fn scoped<'scope, F>(&self, scheduler: F)
396    where
397        F: FnOnce(&mut Scope<'scope>),
398    {
399        let mut scope = Scope {
400            futures: Default::default(),
401            _phantom: PhantomData,
402        };
403        (scheduler)(&mut scope);
404        let spawned = scope
405            .futures
406            .into_iter()
407            .map(|f| self.spawn(f))
408            .collect::<Vec<_>>();
409        for task in spawned {
410            task.await;
411        }
412    }
413}
414
415pub struct Scope<'a> {
416    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
417    _phantom: PhantomData<&'a ()>,
418}
419
420impl<'a> Scope<'a> {
421    pub fn spawn<F>(&mut self, f: F)
422    where
423        F: Future<Output = ()> + Send + 'a,
424    {
425        let f = unsafe {
426            mem::transmute::<
427                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
428                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
429            >(Box::pin(f))
430        };
431        self.futures.push(f);
432    }
433}
434
435pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
436    let executor = Arc::new(Deterministic::new(seed));
437    (
438        Rc::new(Foreground::Deterministic(executor.clone())),
439        Arc::new(Background::Deterministic(executor)),
440    )
441}