executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3pub use async_task::Task;
  4use parking_lot::Mutex;
  5use rand::prelude::*;
  6use smol::prelude::*;
  7use smol::{channel, Executor};
  8use std::rc::Rc;
  9use std::sync::mpsc::SyncSender;
 10use std::sync::Arc;
 11use std::{marker::PhantomData, thread};
 12
 13use crate::platform;
 14
 15pub enum Foreground {
 16    Platform {
 17        dispatcher: Arc<dyn platform::Dispatcher>,
 18        _not_send_or_sync: PhantomData<Rc<()>>,
 19    },
 20    Test(smol::LocalExecutor<'static>),
 21    Deterministic(Arc<Deterministic>),
 22}
 23
 24pub enum Background {
 25    Deterministic(Arc<Deterministic>),
 26    Production {
 27        executor: Arc<smol::Executor<'static>>,
 28        _stop: channel::Sender<()>,
 29    },
 30}
 31
 32pub struct Deterministic {
 33    seed: u64,
 34    runnables: Arc<Mutex<(Vec<Runnable>, Option<SyncSender<()>>)>>,
 35}
 36
 37impl Deterministic {
 38    fn new(seed: u64) -> Self {
 39        Self {
 40            seed,
 41            runnables: Default::default(),
 42        }
 43    }
 44
 45    pub fn spawn_local<F, T>(&self, future: F) -> Task<T>
 46    where
 47        T: 'static,
 48        F: Future<Output = T> + 'static,
 49    {
 50        let runnables = self.runnables.clone();
 51        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 52            let mut runnables = runnables.lock();
 53            runnables.0.push(runnable);
 54            if let Some(wake_tx) = runnables.1.as_ref() {
 55                wake_tx.send(()).ok();
 56            }
 57        });
 58        runnable.schedule();
 59        task
 60    }
 61
 62    pub fn spawn<F, T>(&self, future: F) -> Task<T>
 63    where
 64        T: 'static + Send,
 65        F: 'static + Send + Future<Output = T>,
 66    {
 67        let runnables = self.runnables.clone();
 68        let (runnable, task) = async_task::spawn(future, move |runnable| {
 69            let mut runnables = runnables.lock();
 70            runnables.0.push(runnable);
 71            if let Some(wake_tx) = runnables.1.as_ref() {
 72                wake_tx.send(()).ok();
 73            }
 74        });
 75        runnable.schedule();
 76        task
 77    }
 78
 79    pub fn run<F, T>(&self, future: F) -> T
 80    where
 81        T: 'static,
 82        F: Future<Output = T> + 'static,
 83    {
 84        let (wake_tx, wake_rx) = std::sync::mpsc::sync_channel(32);
 85        let runnables = self.runnables.clone();
 86        runnables.lock().1 = Some(wake_tx);
 87
 88        let (output_tx, output_rx) = std::sync::mpsc::channel();
 89        self.spawn_local(async move {
 90            let output = future.await;
 91            output_tx.send(output).unwrap();
 92        })
 93        .detach();
 94
 95        let mut rng = StdRng::seed_from_u64(self.seed);
 96        loop {
 97            if let Ok(value) = output_rx.try_recv() {
 98                runnables.lock().1 = None;
 99                return value;
100            }
101
102            wake_rx.recv().unwrap();
103            let runnable = {
104                let mut runnables = runnables.lock();
105                let runnables = &mut runnables.0;
106                let index = rng.gen_range(0..runnables.len());
107                runnables.remove(index)
108            };
109
110            runnable.run();
111        }
112    }
113}
114
115impl Foreground {
116    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
117        if dispatcher.is_main_thread() {
118            Ok(Self::Platform {
119                dispatcher,
120                _not_send_or_sync: PhantomData,
121            })
122        } else {
123            Err(anyhow!("must be constructed on main thread"))
124        }
125    }
126
127    pub fn test() -> Self {
128        Self::Test(smol::LocalExecutor::new())
129    }
130
131    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
132        match self {
133            Self::Platform { dispatcher, .. } => {
134                let dispatcher = dispatcher.clone();
135                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
136                let (runnable, task) = async_task::spawn_local(future, schedule);
137                runnable.schedule();
138                task
139            }
140            Self::Test(executor) => executor.spawn(future),
141            Self::Deterministic(executor) => executor.spawn_local(future),
142        }
143    }
144
145    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
146        match self {
147            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
148            Self::Test(executor) => smol::block_on(executor.run(future)),
149            Self::Deterministic(executor) => executor.run(future),
150        }
151    }
152}
153
154impl Background {
155    pub fn new() -> Self {
156        let executor = Arc::new(Executor::new());
157        let stop = channel::unbounded::<()>();
158
159        for i in 0..num_cpus::get() {
160            let executor = executor.clone();
161            let stop = stop.1.clone();
162            thread::Builder::new()
163                .name(format!("background-executor-{}", i))
164                .spawn(move || smol::block_on(executor.run(stop.recv())))
165                .unwrap();
166        }
167
168        Self::Production {
169            executor,
170            _stop: stop.0,
171        }
172    }
173
174    pub fn spawn<T, F>(&self, future: F) -> Task<T>
175    where
176        T: 'static + Send,
177        F: Send + Future<Output = T> + 'static,
178    {
179        match self {
180            Self::Production { executor, .. } => executor.spawn(future),
181            Self::Deterministic(executor) => executor.spawn(future),
182        }
183    }
184}
185
186pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
187    let executor = Arc::new(Deterministic::new(seed));
188    (
189        Rc::new(Foreground::Deterministic(executor.clone())),
190        Arc::new(Background::Deterministic(executor)),
191    )
192}