executor.rs

  1use crate::{Scheduler, SessionId, Timer};
  2use std::{
  3    future::Future,
  4    marker::PhantomData,
  5    pin::Pin,
  6    rc::Rc,
  7    sync::Arc,
  8    task::{Context, Poll},
  9    time::Duration,
 10};
 11
 12#[derive(Clone)]
 13pub struct ForegroundExecutor {
 14    session_id: SessionId,
 15    scheduler: Arc<dyn Scheduler>,
 16    not_send: PhantomData<Rc<()>>,
 17}
 18
 19impl ForegroundExecutor {
 20    pub fn spawn<F>(&self, future: F) -> Task<F::Output>
 21    where
 22        F: Future + 'static,
 23        F::Output: 'static,
 24    {
 25        let session_id = self.session_id;
 26        let scheduler = Arc::clone(&self.scheduler);
 27        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 28            scheduler.schedule_foreground(session_id, runnable);
 29        });
 30        runnable.schedule();
 31        Task(TaskState::Spawned(task))
 32    }
 33
 34    pub fn timer(&self, duration: Duration) -> Timer {
 35        self.scheduler.timer(duration)
 36    }
 37}
 38
 39impl ForegroundExecutor {
 40    pub fn new(session_id: SessionId, scheduler: Arc<dyn Scheduler>) -> Self {
 41        assert!(
 42            scheduler.is_main_thread(),
 43            "ForegroundExecutor must be created on the same thread as the Scheduler"
 44        );
 45        Self {
 46            session_id,
 47            scheduler,
 48            not_send: PhantomData,
 49        }
 50    }
 51}
 52
 53impl BackgroundExecutor {
 54    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
 55        Self { scheduler }
 56    }
 57}
 58
 59pub struct BackgroundExecutor {
 60    scheduler: Arc<dyn Scheduler>,
 61}
 62
 63impl BackgroundExecutor {
 64    pub fn spawn<F>(&self, future: F) -> Task<F::Output>
 65    where
 66        F: Future + Send + 'static,
 67        F::Output: Send + 'static,
 68    {
 69        let scheduler = Arc::clone(&self.scheduler);
 70        let (runnable, task) = async_task::spawn(future, move |runnable| {
 71            scheduler.schedule_background(runnable);
 72        });
 73        runnable.schedule();
 74        Task(TaskState::Spawned(task))
 75    }
 76
 77    pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
 78        self.scheduler.block_on(future)
 79    }
 80
 81    pub fn block_with_timeout<Fut: Unpin + Future>(
 82        &self,
 83        future: &mut Fut,
 84        timeout: Duration,
 85    ) -> Option<Fut::Output> {
 86        self.scheduler.block_with_timeout(future, timeout)
 87    }
 88
 89    pub fn timer(&self, duration: Duration) -> Timer {
 90        self.scheduler.timer(duration)
 91    }
 92}
 93
 94/// Task is a primitive that allows work to happen in the background.
 95///
 96/// It implements [`Future`] so you can `.await` on it.
 97///
 98/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 99/// the task to continue running, but with no way to return a value.
100#[must_use]
101#[derive(Debug)]
102pub struct Task<T>(TaskState<T>);
103
104#[derive(Debug)]
105enum TaskState<T> {
106    /// A task that is ready to return a value
107    Ready(Option<T>),
108
109    /// A task that is currently running.
110    Spawned(async_task::Task<T>),
111}
112
113impl<T> Task<T> {
114    /// Creates a new task that will resolve with the value
115    pub fn ready(val: T) -> Self {
116        Task(TaskState::Ready(Some(val)))
117    }
118
119    /// Detaching a task runs it to completion in the background
120    pub fn detach(self) {
121        match self {
122            Task(TaskState::Ready(_)) => {}
123            Task(TaskState::Spawned(task)) => task.detach(),
124        }
125    }
126}
127
128impl<T> Future for Task<T> {
129    type Output = T;
130
131    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
132        match unsafe { self.get_unchecked_mut() } {
133            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
134            Task(TaskState::Spawned(task)) => Pin::new(task).poll(cx),
135        }
136    }
137}