executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    ops::RangeInclusive,
 13    pin::Pin,
 14    rc::Rc,
 15    sync::{
 16        atomic::{AtomicBool, Ordering::SeqCst},
 17        Arc,
 18    },
 19    task::{Context, Poll},
 20    thread,
 21    time::Duration,
 22};
 23use waker_fn::waker_fn;
 24
 25use crate::{platform, util};
 26
 27pub enum Foreground {
 28    Platform {
 29        dispatcher: Arc<dyn platform::Dispatcher>,
 30        _not_send_or_sync: PhantomData<Rc<()>>,
 31    },
 32    Test(smol::LocalExecutor<'static>),
 33    Deterministic(Arc<Deterministic>),
 34}
 35
 36pub enum Background {
 37    Deterministic(Arc<Deterministic>),
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44struct DeterministicState {
 45    rng: StdRng,
 46    seed: u64,
 47    scheduled: Vec<(Runnable, Backtrace)>,
 48    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 49    forbid_parking: bool,
 50    block_on_ticks: RangeInclusive<usize>,
 51}
 52
 53pub struct Deterministic {
 54    state: Arc<Mutex<DeterministicState>>,
 55    parker: Mutex<parking::Parker>,
 56}
 57
 58impl Deterministic {
 59    fn new(seed: u64) -> Self {
 60        Self {
 61            state: Arc::new(Mutex::new(DeterministicState {
 62                rng: StdRng::seed_from_u64(seed),
 63                seed,
 64                scheduled: Default::default(),
 65                spawned_from_foreground: Default::default(),
 66                forbid_parking: false,
 67                block_on_ticks: 0..=1000,
 68            })),
 69            parker: Default::default(),
 70        }
 71    }
 72
 73    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 74    where
 75        T: 'static,
 76        F: Future<Output = T> + 'static,
 77    {
 78        let backtrace = Backtrace::new_unresolved();
 79        let scheduled_once = AtomicBool::new(false);
 80        let state = self.state.clone();
 81        let unparker = self.parker.lock().unparker();
 82        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 83            let mut state = state.lock();
 84            let backtrace = backtrace.clone();
 85            if scheduled_once.fetch_or(true, SeqCst) {
 86                state.scheduled.push((runnable, backtrace));
 87            } else {
 88                state.spawned_from_foreground.push((runnable, backtrace));
 89            }
 90            unparker.unpark();
 91        });
 92        runnable.schedule();
 93        task
 94    }
 95
 96    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 97    where
 98        T: 'static + Send,
 99        F: 'static + Send + Future<Output = T>,
100    {
101        let backtrace = Backtrace::new_unresolved();
102        let state = self.state.clone();
103        let unparker = self.parker.lock().unparker();
104        let (runnable, task) = async_task::spawn(future, move |runnable| {
105            let mut state = state.lock();
106            state.scheduled.push((runnable, backtrace.clone()));
107            unparker.unpark();
108        });
109        runnable.schedule();
110        task
111    }
112
113    pub fn run<F, T>(&self, future: F) -> T
114    where
115        T: 'static,
116        F: Future<Output = T> + 'static,
117    {
118        self.block_on(usize::MAX, future).unwrap()
119    }
120
121    pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
122    where
123        T: 'static,
124        F: Future<Output = T>,
125    {
126        smol::pin!(future);
127
128        let unparker = self.parker.lock().unparker();
129        let waker = waker_fn(move || {
130            unparker.unpark();
131        });
132
133        let mut cx = Context::from_waker(&waker);
134        let mut trace = Trace::default();
135        for _ in 0..max_ticks {
136            let mut state = self.state.lock();
137            let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
138            let ix = state.rng.gen_range(0..=runnable_count);
139            if ix < state.scheduled.len() {
140                let (_, backtrace) = &state.scheduled[ix];
141                trace.record(&state, backtrace.clone());
142                let runnable = state.scheduled.remove(ix).0;
143                drop(state);
144                runnable.run();
145            } else if ix < runnable_count {
146                let (_, backtrace) = &state.spawned_from_foreground[0];
147                trace.record(&state, backtrace.clone());
148                let runnable = state.spawned_from_foreground.remove(0).0;
149                drop(state);
150                runnable.run();
151            } else {
152                drop(state);
153                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
154                    return Some(result);
155                }
156                let state = self.state.lock();
157                if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
158                    if state.forbid_parking {
159                        panic!("deterministic executor parked after a call to forbid_parking");
160                    }
161                    drop(state);
162                    self.parker.lock().park();
163                }
164
165                continue;
166            }
167        }
168
169        None
170    }
171}
172
173#[derive(Default)]
174struct Trace {
175    executed: Vec<Backtrace>,
176    scheduled: Vec<Vec<Backtrace>>,
177    spawned_from_foreground: Vec<Vec<Backtrace>>,
178}
179
180impl Trace {
181    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
182        self.scheduled.push(
183            state
184                .scheduled
185                .iter()
186                .map(|(_, backtrace)| backtrace.clone())
187                .collect(),
188        );
189        self.spawned_from_foreground.push(
190            state
191                .spawned_from_foreground
192                .iter()
193                .map(|(_, backtrace)| backtrace.clone())
194                .collect(),
195        );
196        self.executed.push(executed);
197    }
198
199    fn resolve(&mut self) {
200        for backtrace in &mut self.executed {
201            backtrace.resolve();
202        }
203
204        for backtraces in &mut self.scheduled {
205            for backtrace in backtraces {
206                backtrace.resolve();
207            }
208        }
209
210        for backtraces in &mut self.spawned_from_foreground {
211            for backtrace in backtraces {
212                backtrace.resolve();
213            }
214        }
215    }
216}
217
218impl Debug for Trace {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
221
222        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
223            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
224                let cwd = std::env::current_dir().unwrap();
225                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
226                    fmt::Display::fmt(&path, fmt)
227                };
228                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
229                for frame in self.0.frames() {
230                    let mut formatted_frame = fmt.frame();
231                    if frame
232                        .symbols()
233                        .iter()
234                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
235                    {
236                        formatted_frame.backtrace_frame(frame)?;
237                        break;
238                    }
239                }
240                fmt.finish()
241            }
242        }
243
244        for ((backtrace, scheduled), spawned_from_foreground) in self
245            .executed
246            .iter()
247            .zip(&self.scheduled)
248            .zip(&self.spawned_from_foreground)
249        {
250            writeln!(f, "Scheduled")?;
251            for backtrace in scheduled {
252                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
253            }
254            if scheduled.is_empty() {
255                writeln!(f, "None")?;
256            }
257            writeln!(f, "==========")?;
258
259            writeln!(f, "Spawned from foreground")?;
260            for backtrace in spawned_from_foreground {
261                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
262            }
263            if spawned_from_foreground.is_empty() {
264                writeln!(f, "None")?;
265            }
266            writeln!(f, "==========")?;
267
268            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
269            writeln!(f, "+++++++++++++++++++")?;
270        }
271
272        Ok(())
273    }
274}
275
276impl Drop for Trace {
277    fn drop(&mut self) {
278        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
279            trace_on_panic == "1" || trace_on_panic == "true"
280        } else {
281            false
282        };
283        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
284            trace_always == "1" || trace_always == "true"
285        } else {
286            false
287        };
288
289        if trace_always || (trace_on_panic && thread::panicking()) {
290            self.resolve();
291            dbg!(self);
292        }
293    }
294}
295
296impl Foreground {
297    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
298        if dispatcher.is_main_thread() {
299            Ok(Self::Platform {
300                dispatcher,
301                _not_send_or_sync: PhantomData,
302            })
303        } else {
304            Err(anyhow!("must be constructed on main thread"))
305        }
306    }
307
308    pub fn test() -> Self {
309        Self::Test(smol::LocalExecutor::new())
310    }
311
312    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
313        match self {
314            Self::Platform { dispatcher, .. } => {
315                let dispatcher = dispatcher.clone();
316                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
317                let (runnable, task) = async_task::spawn_local(future, schedule);
318                runnable.schedule();
319                task
320            }
321            Self::Test(executor) => executor.spawn(future),
322            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
323        }
324    }
325
326    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
327        match self {
328            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
329            Self::Test(executor) => smol::block_on(executor.run(future)),
330            Self::Deterministic(executor) => executor.run(future),
331        }
332    }
333
334    pub fn forbid_parking(&self) {
335        match self {
336            Self::Deterministic(executor) => {
337                let mut state = executor.state.lock();
338                state.forbid_parking = true;
339                state.rng = StdRng::seed_from_u64(state.seed);
340            }
341            _ => panic!("this method can only be called on a deterministic executor"),
342        }
343    }
344
345    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
346        match self {
347            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
348            _ => panic!("this method can only be called on a deterministic executor"),
349        }
350    }
351}
352
353impl Background {
354    pub fn new() -> Self {
355        let executor = Arc::new(Executor::new());
356        let stop = channel::unbounded::<()>();
357
358        for i in 0..2 * num_cpus::get() {
359            let executor = executor.clone();
360            let stop = stop.1.clone();
361            thread::Builder::new()
362                .name(format!("background-executor-{}", i))
363                .spawn(move || smol::block_on(executor.run(stop.recv())))
364                .unwrap();
365        }
366
367        Self::Production {
368            executor,
369            _stop: stop.0,
370        }
371    }
372
373    pub fn num_cpus(&self) -> usize {
374        num_cpus::get()
375    }
376
377    pub fn spawn<T, F>(&self, future: F) -> Task<T>
378    where
379        T: 'static + Send,
380        F: Send + Future<Output = T> + 'static,
381    {
382        match self {
383            Self::Production { executor, .. } => executor.spawn(future),
384            Self::Deterministic(executor) => executor.spawn(future),
385        }
386    }
387
388    pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
389    where
390        T: 'static,
391        F: Future<Output = T>,
392    {
393        match self {
394            Self::Production { .. } => {
395                smol::block_on(async move { util::timeout(timeout, future).await.ok() })
396            }
397            Self::Deterministic(executor) => {
398                let max_ticks = {
399                    let mut state = executor.state.lock();
400                    let range = state.block_on_ticks.clone();
401                    state.rng.gen_range(range)
402                };
403                executor.block_on(max_ticks, future)
404            }
405        }
406    }
407
408    pub async fn scoped<'scope, F>(&self, scheduler: F)
409    where
410        F: FnOnce(&mut Scope<'scope>),
411    {
412        let mut scope = Scope {
413            futures: Default::default(),
414            _phantom: PhantomData,
415        };
416        (scheduler)(&mut scope);
417        let spawned = scope
418            .futures
419            .into_iter()
420            .map(|f| self.spawn(f))
421            .collect::<Vec<_>>();
422        for task in spawned {
423            task.await;
424        }
425    }
426}
427
428pub struct Scope<'a> {
429    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
430    _phantom: PhantomData<&'a ()>,
431}
432
433impl<'a> Scope<'a> {
434    pub fn spawn<F>(&mut self, f: F)
435    where
436        F: Future<Output = ()> + Send + 'a,
437    {
438        let f = unsafe {
439            mem::transmute::<
440                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
441                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
442            >(Box::pin(f))
443        };
444        self.futures.push(f);
445    }
446}
447
448pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
449    let executor = Arc::new(Deterministic::new(seed));
450    (
451        Rc::new(Foreground::Deterministic(executor.clone())),
452        Arc::new(Background::Deterministic(executor)),
453    )
454}