executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    pin::Pin,
 13    rc::Rc,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        mpsc::Sender,
 17        Arc,
 18    },
 19    thread,
 20};
 21
 22use crate::platform;
 23
 24pub enum Foreground {
 25    Platform {
 26        dispatcher: Arc<dyn platform::Dispatcher>,
 27        _not_send_or_sync: PhantomData<Rc<()>>,
 28    },
 29    Test(smol::LocalExecutor<'static>),
 30    Deterministic(Arc<Deterministic>),
 31}
 32
 33pub enum Background {
 34    Deterministic(Arc<Deterministic>),
 35    Production {
 36        executor: Arc<smol::Executor<'static>>,
 37        _stop: channel::Sender<()>,
 38    },
 39}
 40
 41struct DeterministicState {
 42    rng: StdRng,
 43    seed: u64,
 44    scheduled: Vec<(Runnable, Backtrace)>,
 45    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 46    waker: Option<Sender<()>>,
 47}
 48
 49pub struct Deterministic(Arc<Mutex<DeterministicState>>);
 50
 51impl Deterministic {
 52    fn new(seed: u64) -> Self {
 53        Self(Arc::new(Mutex::new(DeterministicState {
 54            rng: StdRng::seed_from_u64(seed),
 55            seed,
 56            scheduled: Default::default(),
 57            spawned_from_foreground: Default::default(),
 58            waker: None,
 59        })))
 60    }
 61
 62    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 63    where
 64        T: 'static,
 65        F: Future<Output = T> + 'static,
 66    {
 67        let backtrace = Backtrace::new_unresolved();
 68        let scheduled_once = AtomicBool::new(false);
 69        let state = self.0.clone();
 70        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 71            let mut state = state.lock();
 72            let backtrace = backtrace.clone();
 73            if scheduled_once.fetch_or(true, SeqCst) {
 74                state.scheduled.push((runnable, backtrace));
 75            } else {
 76                state.spawned_from_foreground.push((runnable, backtrace));
 77            }
 78            if let Some(waker) = state.waker.as_ref() {
 79                waker.send(()).ok();
 80            }
 81        });
 82        runnable.schedule();
 83        task
 84    }
 85
 86    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 87    where
 88        T: 'static + Send,
 89        F: 'static + Send + Future<Output = T>,
 90    {
 91        let backtrace = Backtrace::new_unresolved();
 92        let state = self.0.clone();
 93        let (runnable, task) = async_task::spawn(future, move |runnable| {
 94            let mut state = state.lock();
 95            state.scheduled.push((runnable, backtrace.clone()));
 96            if let Some(waker) = state.waker.as_ref() {
 97                waker.send(()).ok();
 98            }
 99        });
100        runnable.schedule();
101        task
102    }
103
104    pub fn run<F, T>(&self, future: F) -> T
105    where
106        T: 'static,
107        F: Future<Output = T> + 'static,
108    {
109        let (wake_tx, wake_rx) = std::sync::mpsc::channel();
110        let state = self.0.clone();
111        state.lock().waker = Some(wake_tx);
112
113        let (output_tx, output_rx) = std::sync::mpsc::channel();
114        self.spawn_from_foreground(async move {
115            let output = future.await;
116            output_tx.send(output).unwrap();
117        })
118        .detach();
119
120        let mut trace = Trace::default();
121        loop {
122            if let Ok(value) = output_rx.try_recv() {
123                state.lock().waker = None;
124                return value;
125            }
126
127            wake_rx.recv().unwrap();
128            let runnable = {
129                let state = &mut *state.lock();
130                let ix = state
131                    .rng
132                    .gen_range(0..state.scheduled.len() + state.spawned_from_foreground.len());
133                if ix < state.scheduled.len() {
134                    let (_, backtrace) = &state.scheduled[ix];
135                    trace.record(&state, backtrace.clone());
136                    state.scheduled.remove(ix).0
137                } else {
138                    let (_, backtrace) = &state.spawned_from_foreground[0];
139                    trace.record(&state, backtrace.clone());
140                    state.spawned_from_foreground.remove(0).0
141                }
142            };
143
144            runnable.run();
145        }
146    }
147}
148
149#[derive(Default)]
150struct Trace {
151    executed: Vec<Backtrace>,
152    scheduled: Vec<Vec<Backtrace>>,
153    spawned_from_foreground: Vec<Vec<Backtrace>>,
154}
155
156impl Trace {
157    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
158        self.scheduled.push(
159            state
160                .scheduled
161                .iter()
162                .map(|(_, backtrace)| backtrace.clone())
163                .collect(),
164        );
165        self.spawned_from_foreground.push(
166            state
167                .spawned_from_foreground
168                .iter()
169                .map(|(_, backtrace)| backtrace.clone())
170                .collect(),
171        );
172        self.executed.push(executed);
173    }
174
175    fn resolve(&mut self) {
176        for backtrace in &mut self.executed {
177            backtrace.resolve();
178        }
179
180        for backtraces in &mut self.scheduled {
181            for backtrace in backtraces {
182                backtrace.resolve();
183            }
184        }
185
186        for backtraces in &mut self.spawned_from_foreground {
187            for backtrace in backtraces {
188                backtrace.resolve();
189            }
190        }
191    }
192}
193
194impl Debug for Trace {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
197
198        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
199            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
200                let cwd = std::env::current_dir().unwrap();
201                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
202                    fmt::Display::fmt(&path, fmt)
203                };
204                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
205                for frame in self.0.frames() {
206                    let mut formatted_frame = fmt.frame();
207                    if frame
208                        .symbols()
209                        .iter()
210                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
211                    {
212                        formatted_frame.backtrace_frame(frame)?;
213                        break;
214                    }
215                }
216                fmt.finish()
217            }
218        }
219
220        for ((backtrace, scheduled), spawned_from_foreground) in self
221            .executed
222            .iter()
223            .zip(&self.scheduled)
224            .zip(&self.spawned_from_foreground)
225        {
226            writeln!(f, "Scheduled")?;
227            for backtrace in scheduled {
228                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
229            }
230            if scheduled.is_empty() {
231                writeln!(f, "None")?;
232            }
233            writeln!(f, "==========")?;
234
235            writeln!(f, "Spawned from foreground")?;
236            for backtrace in spawned_from_foreground {
237                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
238            }
239            if spawned_from_foreground.is_empty() {
240                writeln!(f, "None")?;
241            }
242            writeln!(f, "==========")?;
243
244            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
245            writeln!(f, "+++++++++++++++++++")?;
246        }
247
248        Ok(())
249    }
250}
251
252impl Drop for Trace {
253    fn drop(&mut self) {
254        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
255            trace_on_panic == "1" || trace_on_panic == "true"
256        } else {
257            false
258        };
259        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
260            trace_always == "1" || trace_always == "true"
261        } else {
262            false
263        };
264
265        if trace_always || (trace_on_panic && thread::panicking()) {
266            self.resolve();
267            dbg!(self);
268        }
269    }
270}
271
272impl Foreground {
273    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
274        if dispatcher.is_main_thread() {
275            Ok(Self::Platform {
276                dispatcher,
277                _not_send_or_sync: PhantomData,
278            })
279        } else {
280            Err(anyhow!("must be constructed on main thread"))
281        }
282    }
283
284    pub fn test() -> Self {
285        Self::Test(smol::LocalExecutor::new())
286    }
287
288    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
289        match self {
290            Self::Platform { dispatcher, .. } => {
291                let dispatcher = dispatcher.clone();
292                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
293                let (runnable, task) = async_task::spawn_local(future, schedule);
294                runnable.schedule();
295                task
296            }
297            Self::Test(executor) => executor.spawn(future),
298            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
299        }
300    }
301
302    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
303        match self {
304            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
305            Self::Test(executor) => smol::block_on(executor.run(future)),
306            Self::Deterministic(executor) => executor.run(future),
307        }
308    }
309
310    pub fn reset(&self) {
311        match self {
312            Self::Platform { .. } => panic!("can't call this method on a platform executor"),
313            Self::Test(_) => panic!("can't call this method on a test executor"),
314            Self::Deterministic(executor) => {
315                let state = &mut *executor.0.lock();
316                state.rng = StdRng::seed_from_u64(state.seed);
317            }
318        }
319    }
320}
321
322impl Background {
323    pub fn new() -> Self {
324        let executor = Arc::new(Executor::new());
325        let stop = channel::unbounded::<()>();
326
327        for i in 0..2 * num_cpus::get() {
328            let executor = executor.clone();
329            let stop = stop.1.clone();
330            thread::Builder::new()
331                .name(format!("background-executor-{}", i))
332                .spawn(move || smol::block_on(executor.run(stop.recv())))
333                .unwrap();
334        }
335
336        Self::Production {
337            executor,
338            _stop: stop.0,
339        }
340    }
341
342    pub fn num_cpus(&self) -> usize {
343        num_cpus::get()
344    }
345
346    pub fn spawn<T, F>(&self, future: F) -> Task<T>
347    where
348        T: 'static + Send,
349        F: Send + Future<Output = T> + 'static,
350    {
351        match self {
352            Self::Production { executor, .. } => executor.spawn(future),
353            Self::Deterministic(executor) => executor.spawn(future),
354        }
355    }
356
357    pub async fn scoped<'scope, F>(&self, scheduler: F)
358    where
359        F: FnOnce(&mut Scope<'scope>),
360    {
361        let mut scope = Scope {
362            futures: Default::default(),
363            _phantom: PhantomData,
364        };
365        (scheduler)(&mut scope);
366        let spawned = scope
367            .futures
368            .into_iter()
369            .map(|f| self.spawn(f))
370            .collect::<Vec<_>>();
371        for task in spawned {
372            task.await;
373        }
374    }
375}
376
377pub struct Scope<'a> {
378    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
379    _phantom: PhantomData<&'a ()>,
380}
381
382impl<'a> Scope<'a> {
383    pub fn spawn<F>(&mut self, f: F)
384    where
385        F: Future<Output = ()> + Send + 'a,
386    {
387        let f = unsafe {
388            mem::transmute::<
389                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
390                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
391            >(Box::pin(f))
392        };
393        self.futures.push(f);
394    }
395}
396
397pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
398    let executor = Arc::new(Deterministic::new(seed));
399    (
400        Rc::new(Foreground::Deterministic(executor.clone())),
401        Arc::new(Background::Deterministic(executor)),
402    )
403}