executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use pin_project::pin_project;
  4use smol::prelude::*;
  5use smol::{channel, Executor};
  6use std::rc::Rc;
  7use std::sync::Arc;
  8use std::{marker::PhantomData, thread};
  9
 10use crate::platform;
 11
 12pub enum Foreground {
 13    Platform {
 14        dispatcher: Arc<dyn platform::Dispatcher>,
 15        _not_send_or_sync: PhantomData<Rc<()>>,
 16    },
 17    Test(smol::LocalExecutor<'static>),
 18}
 19
 20#[must_use]
 21#[pin_project(project = ForegroundTaskProject)]
 22pub enum ForegroundTask<T> {
 23    Platform(#[pin] async_task::Task<T>),
 24    Test(#[pin] smol::Task<T>),
 25}
 26
 27impl<T> Future for ForegroundTask<T> {
 28    type Output = T;
 29
 30    fn poll(
 31        self: std::pin::Pin<&mut Self>,
 32        ctx: &mut std::task::Context<'_>,
 33    ) -> std::task::Poll<Self::Output> {
 34        match self.project() {
 35            ForegroundTaskProject::Platform(task) => task.poll(ctx),
 36            ForegroundTaskProject::Test(task) => task.poll(ctx),
 37        }
 38    }
 39}
 40
 41pub struct Background {
 42    executor: Arc<smol::Executor<'static>>,
 43    _stop: channel::Sender<()>,
 44}
 45
 46#[must_use]
 47pub type BackgroundTask<T> = smol::Task<T>;
 48
 49impl Foreground {
 50    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 51        if dispatcher.is_main_thread() {
 52            Ok(Self::Platform {
 53                dispatcher,
 54                _not_send_or_sync: PhantomData,
 55            })
 56        } else {
 57            Err(anyhow!("must be constructed on main thread"))
 58        }
 59    }
 60
 61    pub fn test() -> Self {
 62        Self::Test(smol::LocalExecutor::new())
 63    }
 64
 65    pub fn spawn<T: 'static>(
 66        &self,
 67        future: impl Future<Output = T> + 'static,
 68    ) -> ForegroundTask<T> {
 69        match self {
 70            Self::Platform { dispatcher, .. } => {
 71                let dispatcher = dispatcher.clone();
 72                let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 73                let (runnable, task) = async_task::spawn_local(future, schedule);
 74                runnable.schedule();
 75                ForegroundTask::Platform(task)
 76            }
 77            Self::Test(executor) => ForegroundTask::Test(executor.spawn(future)),
 78        }
 79    }
 80
 81    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
 82        match self {
 83            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 84            Self::Test(executor) => executor.run(future).await,
 85        }
 86    }
 87}
 88
 89#[must_use]
 90impl<T> ForegroundTask<T> {
 91    pub fn detach(self) {
 92        match self {
 93            Self::Platform(task) => task.detach(),
 94            Self::Test(task) => task.detach(),
 95        }
 96    }
 97
 98    pub async fn cancel(self) -> Option<T> {
 99        match self {
100            Self::Platform(task) => task.cancel().await,
101            Self::Test(task) => task.cancel().await,
102        }
103    }
104}
105
106impl Background {
107    pub fn new() -> Self {
108        let executor = Arc::new(Executor::new());
109        let stop = channel::unbounded::<()>();
110
111        for i in 0..num_cpus::get() {
112            let executor = executor.clone();
113            let stop = stop.1.clone();
114            thread::Builder::new()
115                .name(format!("background-executor-{}", i))
116                .spawn(move || smol::block_on(executor.run(stop.recv())))
117                .unwrap();
118        }
119
120        Self {
121            executor,
122            _stop: stop.0,
123        }
124    }
125
126    pub fn spawn<T>(&self, future: impl Send + Future<Output = T> + 'static) -> BackgroundTask<T>
127    where
128        T: 'static + Send,
129    {
130        self.executor.spawn(future)
131    }
132}