executor.rs

  1// #[cfg(not(test))]
  2use anyhow::{anyhow, Result};
  3use async_task::Runnable;
  4use smol::prelude::*;
  5use smol::{channel, Executor};
  6use std::rc::Rc;
  7use std::sync::Arc;
  8use std::{marker::PhantomData, thread};
  9
 10use crate::platform;
 11
 12pub enum Foreground {
 13    Platform {
 14        dispatcher: Arc<dyn platform::Dispatcher>,
 15        _not_send_or_sync: PhantomData<Rc<()>>,
 16    },
 17    Test(smol::LocalExecutor<'static>),
 18}
 19
 20pub enum ForegroundTask<T> {
 21    Platform(async_task::Task<T>),
 22    Test(smol::Task<T>),
 23}
 24
 25pub struct Background {
 26    executor: Arc<smol::Executor<'static>>,
 27    _stop: channel::Sender<()>,
 28}
 29
 30pub type BackgroundTask<T> = smol::Task<T>;
 31
 32impl Foreground {
 33    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 34        if dispatcher.is_main_thread() {
 35            Ok(Self::Platform {
 36                dispatcher,
 37                _not_send_or_sync: PhantomData,
 38            })
 39        } else {
 40            Err(anyhow!("must be constructed on main thread"))
 41        }
 42    }
 43
 44    pub fn test() -> Self {
 45        Self::Test(smol::LocalExecutor::new())
 46    }
 47
 48    pub fn spawn<T: 'static>(
 49        &self,
 50        future: impl Future<Output = T> + 'static,
 51    ) -> ForegroundTask<T> {
 52        match self {
 53            Self::Platform { dispatcher, .. } => {
 54                let dispatcher = dispatcher.clone();
 55                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 56                let (runnable, task) = async_task::spawn_local(future, schedule);
 57                runnable.schedule();
 58                ForegroundTask::Platform(task)
 59            }
 60            Self::Test(executor) => ForegroundTask::Test(executor.spawn(future)),
 61        }
 62    }
 63
 64    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
 65        match self {
 66            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 67            Self::Test(executor) => executor.run(future).await,
 68        }
 69    }
 70}
 71
 72impl<T> ForegroundTask<T> {
 73    pub fn detach(self) {
 74        match self {
 75            Self::Platform(task) => task.detach(),
 76            Self::Test(task) => task.detach(),
 77        }
 78    }
 79
 80    pub async fn cancel(self) -> Option<T> {
 81        match self {
 82            Self::Platform(task) => task.cancel().await,
 83            Self::Test(task) => task.cancel().await,
 84        }
 85    }
 86}
 87
 88impl Background {
 89    pub fn new() -> Self {
 90        let executor = Arc::new(Executor::new());
 91        let stop = channel::unbounded::<()>();
 92
 93        for i in 0..num_cpus::get() {
 94            let executor = executor.clone();
 95            let stop = stop.1.clone();
 96            thread::Builder::new()
 97                .name(format!("background-executor-{}", i))
 98                .spawn(move || smol::block_on(executor.run(stop.recv())))
 99                .unwrap();
100        }
101
102        Self {
103            executor,
104            _stop: stop.0,
105        }
106    }
107
108    pub fn spawn<T>(&self, future: impl Send + Future<Output = T> + 'static) -> BackgroundTask<T>
109    where
110        T: 'static + Send,
111    {
112        self.executor.spawn(future)
113    }
114}