executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use pin_project::pin_project;
  4use smol::prelude::*;
  5use smol::{channel, Executor};
  6use std::rc::Rc;
  7use std::sync::Arc;
  8use std::{marker::PhantomData, thread};
  9
 10use crate::platform;
 11
 12pub enum Foreground {
 13    Platform {
 14        dispatcher: Arc<dyn platform::Dispatcher>,
 15        _not_send_or_sync: PhantomData<Rc<()>>,
 16    },
 17    Test(smol::LocalExecutor<'static>),
 18}
 19
 20#[pin_project(project = ForegroundTaskProject)]
 21pub enum ForegroundTask<T> {
 22    Platform(#[pin] async_task::Task<T>),
 23    Test(#[pin] smol::Task<T>),
 24}
 25
 26impl<T> Future for ForegroundTask<T> {
 27    type Output = T;
 28
 29    fn poll(
 30        self: std::pin::Pin<&mut Self>,
 31        ctx: &mut std::task::Context<'_>,
 32    ) -> std::task::Poll<Self::Output> {
 33        match self.project() {
 34            ForegroundTaskProject::Platform(task) => task.poll(ctx),
 35            ForegroundTaskProject::Test(task) => task.poll(ctx),
 36        }
 37    }
 38}
 39
 40pub struct Background {
 41    executor: Arc<smol::Executor<'static>>,
 42    _stop: channel::Sender<()>,
 43}
 44
 45#[must_use]
 46pub type BackgroundTask<T> = smol::Task<T>;
 47
 48impl Foreground {
 49    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 50        if dispatcher.is_main_thread() {
 51            Ok(Self::Platform {
 52                dispatcher,
 53                _not_send_or_sync: PhantomData,
 54            })
 55        } else {
 56            Err(anyhow!("must be constructed on main thread"))
 57        }
 58    }
 59
 60    pub fn test() -> Self {
 61        Self::Test(smol::LocalExecutor::new())
 62    }
 63
 64    pub fn spawn<T: 'static>(
 65        &self,
 66        future: impl Future<Output = T> + 'static,
 67    ) -> ForegroundTask<T> {
 68        match self {
 69            Self::Platform { dispatcher, .. } => {
 70                let dispatcher = dispatcher.clone();
 71                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 72                let (runnable, task) = async_task::spawn_local(future, schedule);
 73                runnable.schedule();
 74                ForegroundTask::Platform(task)
 75            }
 76            Self::Test(executor) => ForegroundTask::Test(executor.spawn(future)),
 77        }
 78    }
 79
 80    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
 81        match self {
 82            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 83            Self::Test(executor) => executor.run(future).await,
 84        }
 85    }
 86}
 87
 88#[must_use]
 89impl<T> ForegroundTask<T> {
 90    pub fn detach(self) {
 91        match self {
 92            Self::Platform(task) => task.detach(),
 93            Self::Test(task) => task.detach(),
 94        }
 95    }
 96
 97    pub async fn cancel(self) -> Option<T> {
 98        match self {
 99            Self::Platform(task) => task.cancel().await,
100            Self::Test(task) => task.cancel().await,
101        }
102    }
103}
104
105impl Background {
106    pub fn new() -> Self {
107        let executor = Arc::new(Executor::new());
108        let stop = channel::unbounded::<()>();
109
110        for i in 0..num_cpus::get() {
111            let executor = executor.clone();
112            let stop = stop.1.clone();
113            thread::Builder::new()
114                .name(format!("background-executor-{}", i))
115                .spawn(move || smol::block_on(executor.run(stop.recv())))
116                .unwrap();
117        }
118
119        Self {
120            executor,
121            _stop: stop.0,
122        }
123    }
124
125    pub fn spawn<T>(&self, future: impl Send + Future<Output = T> + 'static) -> BackgroundTask<T>
126    where
127        T: 'static + Send,
128    {
129        self.executor.spawn(future)
130    }
131}