executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor};
  8use std::{
  9    fmt::{self, Debug},
 10    marker::PhantomData,
 11    mem,
 12    pin::Pin,
 13    rc::Rc,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        mpsc::SyncSender,
 17        Arc,
 18    },
 19    thread,
 20};
 21
 22use crate::platform;
 23
 24pub enum Foreground {
 25    Platform {
 26        dispatcher: Arc<dyn platform::Dispatcher>,
 27        _not_send_or_sync: PhantomData<Rc<()>>,
 28    },
 29    Test(smol::LocalExecutor<'static>),
 30    Deterministic(Arc<Deterministic>),
 31}
 32
 33pub enum Background {
 34    Deterministic(Arc<Deterministic>),
 35    Production {
 36        executor: Arc<smol::Executor<'static>>,
 37        threads: usize,
 38        _stop: channel::Sender<()>,
 39    },
 40}
 41
 42struct DeterministicState {
 43    rng: StdRng,
 44    seed: u64,
 45    scheduled: Vec<(Runnable, Backtrace)>,
 46    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 47    waker: Option<SyncSender<()>>,
 48}
 49
 50pub struct Deterministic(Arc<Mutex<DeterministicState>>);
 51
 52impl Deterministic {
 53    fn new(seed: u64) -> Self {
 54        Self(Arc::new(Mutex::new(DeterministicState {
 55            rng: StdRng::seed_from_u64(seed),
 56            seed,
 57            scheduled: Default::default(),
 58            spawned_from_foreground: Default::default(),
 59            waker: None,
 60        })))
 61    }
 62
 63    pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
 64    where
 65        T: 'static,
 66        F: Future<Output = T> + 'static,
 67    {
 68        let backtrace = Backtrace::new_unresolved();
 69        let scheduled_once = AtomicBool::new(false);
 70        let state = self.0.clone();
 71        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 72            let mut state = state.lock();
 73            let backtrace = backtrace.clone();
 74            if scheduled_once.fetch_or(true, SeqCst) {
 75                state.scheduled.push((runnable, backtrace));
 76            } else {
 77                state.spawned_from_foreground.push((runnable, backtrace));
 78            }
 79            if let Some(waker) = state.waker.as_ref() {
 80                waker.send(()).ok();
 81            }
 82        });
 83        runnable.schedule();
 84        task
 85    }
 86
 87    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 88    where
 89        T: 'static + Send,
 90        F: 'static + Send + Future<Output = T>,
 91    {
 92        let backtrace = Backtrace::new_unresolved();
 93        let state = self.0.clone();
 94        let (runnable, task) = async_task::spawn(future, move |runnable| {
 95            let mut state = state.lock();
 96            state.scheduled.push((runnable, backtrace.clone()));
 97            if let Some(waker) = state.waker.as_ref() {
 98                waker.send(()).ok();
 99            }
100        });
101        runnable.schedule();
102        task
103    }
104
105    pub fn run<F, T>(&self, future: F) -> T
106    where
107        T: 'static,
108        F: Future<Output = T> + 'static,
109    {
110        let (wake_tx, wake_rx) = std::sync::mpsc::sync_channel(32);
111        let state = self.0.clone();
112        state.lock().waker = Some(wake_tx);
113
114        let (output_tx, output_rx) = std::sync::mpsc::channel();
115        self.spawn_from_foreground(async move {
116            let output = future.await;
117            output_tx.send(output).unwrap();
118        })
119        .detach();
120
121        let mut trace = Trace::default();
122        loop {
123            if let Ok(value) = output_rx.try_recv() {
124                state.lock().waker = None;
125                return value;
126            }
127
128            wake_rx.recv().unwrap();
129            let runnable = {
130                let state = &mut *state.lock();
131                let ix = state
132                    .rng
133                    .gen_range(0..state.scheduled.len() + state.spawned_from_foreground.len());
134                if ix < state.scheduled.len() {
135                    let (_, backtrace) = &state.scheduled[ix];
136                    trace.record(&state, backtrace.clone());
137                    state.scheduled.remove(ix).0
138                } else {
139                    let (_, backtrace) = &state.spawned_from_foreground[0];
140                    trace.record(&state, backtrace.clone());
141                    state.spawned_from_foreground.remove(0).0
142                }
143            };
144
145            runnable.run();
146        }
147    }
148}
149
150#[derive(Default)]
151struct Trace {
152    executed: Vec<Backtrace>,
153    scheduled: Vec<Vec<Backtrace>>,
154    spawned_from_foreground: Vec<Vec<Backtrace>>,
155}
156
157impl Trace {
158    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
159        self.scheduled.push(
160            state
161                .scheduled
162                .iter()
163                .map(|(_, backtrace)| backtrace.clone())
164                .collect(),
165        );
166        self.spawned_from_foreground.push(
167            state
168                .spawned_from_foreground
169                .iter()
170                .map(|(_, backtrace)| backtrace.clone())
171                .collect(),
172        );
173        self.executed.push(executed);
174    }
175
176    fn resolve(&mut self) {
177        for backtrace in &mut self.executed {
178            backtrace.resolve();
179        }
180
181        for backtraces in &mut self.scheduled {
182            for backtrace in backtraces {
183                backtrace.resolve();
184            }
185        }
186
187        for backtraces in &mut self.spawned_from_foreground {
188            for backtrace in backtraces {
189                backtrace.resolve();
190            }
191        }
192    }
193}
194
195impl Debug for Trace {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
198
199        impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
200            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
201                let cwd = std::env::current_dir().unwrap();
202                let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
203                    fmt::Display::fmt(&path, fmt)
204                };
205                let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
206                for frame in self.0.frames() {
207                    let mut formatted_frame = fmt.frame();
208                    if frame
209                        .symbols()
210                        .iter()
211                        .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
212                    {
213                        formatted_frame.backtrace_frame(frame)?;
214                        break;
215                    }
216                }
217                fmt.finish()
218            }
219        }
220
221        for ((backtrace, scheduled), spawned_from_foreground) in self
222            .executed
223            .iter()
224            .zip(&self.scheduled)
225            .zip(&self.spawned_from_foreground)
226        {
227            writeln!(f, "Scheduled")?;
228            for backtrace in scheduled {
229                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
230            }
231            if scheduled.is_empty() {
232                writeln!(f, "None")?;
233            }
234            writeln!(f, "==========")?;
235
236            writeln!(f, "Spawned from foreground")?;
237            for backtrace in spawned_from_foreground {
238                writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
239            }
240            if spawned_from_foreground.is_empty() {
241                writeln!(f, "None")?;
242            }
243            writeln!(f, "==========")?;
244
245            writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
246            writeln!(f, "+++++++++++++++++++")?;
247        }
248
249        Ok(())
250    }
251}
252
253impl Drop for Trace {
254    fn drop(&mut self) {
255        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
256            trace_on_panic == "1" || trace_on_panic == "true"
257        } else {
258            false
259        };
260        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
261            trace_always == "1" || trace_always == "true"
262        } else {
263            false
264        };
265
266        if trace_always || (trace_on_panic && thread::panicking()) {
267            self.resolve();
268            dbg!(self);
269        }
270    }
271}
272
273impl Foreground {
274    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
275        if dispatcher.is_main_thread() {
276            Ok(Self::Platform {
277                dispatcher,
278                _not_send_or_sync: PhantomData,
279            })
280        } else {
281            Err(anyhow!("must be constructed on main thread"))
282        }
283    }
284
285    pub fn test() -> Self {
286        Self::Test(smol::LocalExecutor::new())
287    }
288
289    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
290        match self {
291            Self::Platform { dispatcher, .. } => {
292                let dispatcher = dispatcher.clone();
293                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
294                let (runnable, task) = async_task::spawn_local(future, schedule);
295                runnable.schedule();
296                task
297            }
298            Self::Test(executor) => executor.spawn(future),
299            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
300        }
301    }
302
303    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
304        match self {
305            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
306            Self::Test(executor) => smol::block_on(executor.run(future)),
307            Self::Deterministic(executor) => executor.run(future),
308        }
309    }
310
311    pub fn reset(&self) {
312        match self {
313            Self::Platform { .. } => panic!("can't call this method on a platform executor"),
314            Self::Test(_) => panic!("can't call this method on a test executor"),
315            Self::Deterministic(executor) => {
316                let state = &mut *executor.0.lock();
317                state.rng = StdRng::seed_from_u64(state.seed);
318            }
319        }
320    }
321}
322
323impl Background {
324    pub fn new() -> Self {
325        let executor = Arc::new(Executor::new());
326        let stop = channel::unbounded::<()>();
327        let threads = num_cpus::get();
328
329        for i in 0..threads {
330            let executor = executor.clone();
331            let stop = stop.1.clone();
332            thread::Builder::new()
333                .name(format!("background-executor-{}", i))
334                .spawn(move || smol::block_on(executor.run(stop.recv())))
335                .unwrap();
336        }
337
338        Self::Production {
339            executor,
340            threads,
341            _stop: stop.0,
342        }
343    }
344
345    pub fn threads(&self) -> usize {
346        match self {
347            Self::Deterministic(_) => 1,
348            Self::Production { threads, .. } => *threads,
349        }
350    }
351
352    pub fn spawn<T, F>(&self, future: F) -> Task<T>
353    where
354        T: 'static + Send,
355        F: Send + Future<Output = T> + 'static,
356    {
357        match self {
358            Self::Production { executor, .. } => executor.spawn(future),
359            Self::Deterministic(executor) => executor.spawn(future),
360        }
361    }
362
363    pub async fn scoped<'scope, F>(&self, scheduler: F)
364    where
365        F: FnOnce(&mut Scope<'scope>),
366    {
367        let mut scope = Scope {
368            futures: Default::default(),
369            _phantom: PhantomData,
370        };
371        (scheduler)(&mut scope);
372        let spawned = scope
373            .futures
374            .into_iter()
375            .map(|f| self.spawn(f))
376            .collect::<Vec<_>>();
377        for task in spawned {
378            task.await;
379        }
380    }
381}
382
383pub struct Scope<'a> {
384    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
385    _phantom: PhantomData<&'a ()>,
386}
387
388impl<'a> Scope<'a> {
389    pub fn spawn<F>(&mut self, f: F)
390    where
391        F: Future<Output = ()> + Send + 'a,
392    {
393        let f = unsafe {
394            mem::transmute::<
395                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
396                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
397            >(Box::pin(f))
398        };
399        self.futures.push(f);
400    }
401}
402
403pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
404    let executor = Arc::new(Deterministic::new(seed));
405    (
406        Rc::new(Foreground::Deterministic(executor.clone())),
407        Arc::new(Background::Deterministic(executor)),
408    )
409}