executor.rs

  1use crate::{AppContext, PlatformDispatcher};
  2use smol::prelude::*;
  3use std::{
  4    fmt::Debug,
  5    pin::Pin,
  6    sync::Arc,
  7    task::{Context, Poll},
  8};
  9use util::TryFutureExt;
 10
 11#[derive(Clone)]
 12pub struct Executor {
 13    dispatcher: Arc<dyn PlatformDispatcher>,
 14}
 15
 16#[must_use]
 17pub enum Task<T> {
 18    Ready(Option<T>),
 19    Spawned(async_task::Task<T>),
 20}
 21
 22impl<T> Task<T> {
 23    pub fn ready(val: T) -> Self {
 24        Task::Ready(Some(val))
 25    }
 26
 27    pub fn detach(self) {
 28        match self {
 29            Task::Ready(_) => {}
 30            Task::Spawned(task) => task.detach(),
 31        }
 32    }
 33}
 34
 35impl<E, T> Task<Result<T, E>>
 36where
 37    T: 'static + Send,
 38    E: 'static + Send + Debug,
 39{
 40    pub fn detach_and_log_err(self, cx: &mut AppContext) {
 41        cx.executor().spawn(self.log_err()).detach();
 42    }
 43}
 44
 45impl<T> Future for Task<T> {
 46    type Output = T;
 47
 48    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 49        match unsafe { self.get_unchecked_mut() } {
 50            Task::Ready(val) => Poll::Ready(val.take().unwrap()),
 51            Task::Spawned(task) => task.poll(cx),
 52        }
 53    }
 54}
 55
 56impl Executor {
 57    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 58        Self { dispatcher }
 59    }
 60
 61    /// Enqueues the given closure to be run on any thread. The closure returns
 62    /// a future which will be run to completion on any available thread.
 63    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 64    where
 65        R: Send + 'static,
 66    {
 67        let dispatcher = self.dispatcher.clone();
 68        let (runnable, task) =
 69            async_task::spawn(future, move |runnable| dispatcher.dispatch(runnable));
 70        runnable.schedule();
 71        Task::Spawned(task)
 72    }
 73
 74    /// Enqueues the given closure to run on the application's event loop.
 75    /// Returns the result asynchronously.
 76    pub fn run_on_main<F, R>(&self, func: F) -> Task<R>
 77    where
 78        F: FnOnce() -> R + Send + 'static,
 79        R: Send + 'static,
 80    {
 81        if self.dispatcher.is_main_thread() {
 82            Task::ready(func())
 83        } else {
 84            self.spawn_on_main(move || async move { func() })
 85        }
 86    }
 87
 88    /// Enqueues the given closure to be run on the application's event loop. The
 89    /// closure returns a future which will be run to completion on the main thread.
 90    pub fn spawn_on_main<F, R>(&self, func: impl FnOnce() -> F + Send + 'static) -> Task<R>
 91    where
 92        F: Future<Output = R> + 'static,
 93        R: Send + 'static,
 94    {
 95        let (runnable, task) = async_task::spawn(
 96            {
 97                let this = self.clone();
 98                async move {
 99                    let task = this.spawn_on_main_local(func());
100                    task.await
101                }
102            },
103            {
104                let dispatcher = self.dispatcher.clone();
105                move |runnable| dispatcher.dispatch_on_main_thread(runnable)
106            },
107        );
108        runnable.schedule();
109        Task::Spawned(task)
110    }
111
112    /// Enqueues the given closure to be run on the application's event loop. Must
113    /// be called on the main thread.
114    pub fn spawn_on_main_local<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
115    where
116        R: 'static,
117    {
118        assert!(
119            self.dispatcher.is_main_thread(),
120            "must be called on main thread"
121        );
122
123        let dispatcher = self.dispatcher.clone();
124        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
125            dispatcher.dispatch_on_main_thread(runnable)
126        });
127        runnable.schedule();
128        Task::Spawned(task)
129    }
130
131    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
132        // todo!("integrate with deterministic dispatcher")
133        futures::executor::block_on(future)
134    }
135
136    pub fn is_main_thread(&self) -> bool {
137        self.dispatcher.is_main_thread()
138    }
139}