scheduler.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4
  5use crate::{BackgroundExecutor, ForegroundExecutor};
  6use async_task::Runnable;
  7use futures::{
  8    channel::oneshot,
  9    future::{self, LocalBoxFuture},
 10};
 11use scheduler::{Clock, Scheduler, SessionId, Timer};
 12use std::{
 13    ffi::c_void,
 14    ptr::{NonNull, addr_of},
 15    sync::{
 16        Arc,
 17        atomic::{AtomicU16, Ordering::SeqCst},
 18    },
 19    time::Duration,
 20};
 21
 22/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
 23/// these pub items from leaking into public API.
 24pub(crate) mod dispatch_sys {
 25    include!(concat!(env!("OUT_DIR"), "/dispatch_sys.rs"));
 26}
 27
 28use dispatch_sys::*;
 29pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
 30    addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
 31}
 32
 33pub(crate) struct MacScheduler {
 34    next_session_id: AtomicU16,
 35}
 36
 37impl MacScheduler {
 38    pub fn new() -> Self {
 39        MacScheduler {
 40            next_session_id: AtomicU16::new(0),
 41        }
 42    }
 43
 44    pub fn background(self: &Arc<Self>) -> BackgroundExecutor {
 45        BackgroundExecutor::new(scheduler::BackgroundExecutor::new(self.clone()))
 46    }
 47
 48    pub fn foreground(self: &Arc<Self>) -> ForegroundExecutor {
 49        let session_id = SessionId::new(self.next_session_id.fetch_add(1, SeqCst));
 50        ForegroundExecutor::new(scheduler::ForegroundExecutor::new(session_id, self.clone()))
 51    }
 52}
 53
 54impl Scheduler for MacScheduler {
 55    fn block(
 56        &self,
 57        _session_id: Option<SessionId>,
 58        future: LocalBoxFuture<()>,
 59        timeout: Option<Duration>,
 60    ) {
 61        if let Some(timeout) = timeout {
 62            let timer = self.timer(timeout);
 63            futures::executor::block_on(future::select(timer, future));
 64        } else {
 65            futures::executor::block_on(future);
 66        }
 67    }
 68
 69    fn schedule_foreground(&self, _session_id: SessionId, runnable: Runnable) {
 70        unsafe {
 71            dispatch_async_f(
 72                dispatch_get_main_queue(),
 73                runnable.into_raw().as_ptr() as *mut c_void,
 74                Some(trampoline),
 75            );
 76        }
 77    }
 78
 79    fn schedule_background(&self, runnable: Runnable) {
 80        unsafe {
 81            dispatch_async_f(
 82                dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
 83                runnable.into_raw().as_ptr() as *mut c_void,
 84                Some(trampoline),
 85            );
 86        }
 87    }
 88
 89    fn timer(&self, timeout: Duration) -> Timer {
 90        let (tx, rx) = oneshot::channel();
 91        let (runnable, task) = async_task::spawn(
 92            async move {
 93                tx.send(()).ok();
 94            },
 95            move |runnable: Runnable| unsafe {
 96                let queue =
 97                    dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
 98                let when = dispatch_time(DISPATCH_TIME_NOW as u64, timeout.as_nanos() as i64);
 99                dispatch_after_f(
100                    when,
101                    queue,
102                    runnable.into_raw().as_ptr() as *mut c_void,
103                    Some(trampoline),
104                );
105            },
106        );
107        runnable.schedule();
108        task.detach();
109
110        Timer::new(rx)
111    }
112
113    fn clock(&self) -> Arc<dyn Clock> {
114        todo!()
115    }
116}
117
118extern "C" fn trampoline(runnable: *mut c_void) {
119    let task = unsafe { Runnable::<()>::from_raw(NonNull::new_unchecked(runnable as *mut ())) };
120    task.run();
121}