platform_scheduler.rs

  1use crate::{PlatformDispatcher, RunnableMeta};
  2use async_task::Runnable;
  3use chrono::{DateTime, Utc};
  4use futures::channel::oneshot;
  5use scheduler::{Clock, Priority, Scheduler, SessionId, TestScheduler, Timer};
  6use std::{
  7    future::Future,
  8    pin::Pin,
  9    sync::{
 10        Arc,
 11        atomic::{AtomicU16, Ordering},
 12    },
 13    task::{Context, Poll},
 14    time::{Duration, Instant},
 15};
 16use waker_fn::waker_fn;
 17
 18/// A production implementation of [`Scheduler`] that wraps a [`PlatformDispatcher`].
 19///
 20/// This allows GPUI to use the scheduler crate's executor types with the platform's
 21/// native dispatch mechanisms (e.g., Grand Central Dispatch on macOS).
 22pub struct PlatformScheduler {
 23    dispatcher: Arc<dyn PlatformDispatcher>,
 24    clock: Arc<PlatformClock>,
 25    next_session_id: AtomicU16,
 26}
 27
 28impl PlatformScheduler {
 29    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 30        Self {
 31            dispatcher: dispatcher.clone(),
 32            clock: Arc::new(PlatformClock { dispatcher }),
 33            next_session_id: AtomicU16::new(0),
 34        }
 35    }
 36
 37    pub fn allocate_session_id(&self) -> SessionId {
 38        SessionId::new(self.next_session_id.fetch_add(1, Ordering::SeqCst))
 39    }
 40}
 41
 42impl Scheduler for PlatformScheduler {
 43    fn block(
 44        &self,
 45        _session_id: Option<SessionId>,
 46        mut future: Pin<&mut dyn Future<Output = ()>>,
 47        timeout: Option<Duration>,
 48    ) -> bool {
 49        let deadline = timeout.map(|t| Instant::now() + t);
 50        let parker = parking::Parker::new();
 51        let unparker = parker.unparker();
 52        let waker = waker_fn(move || {
 53            unparker.unpark();
 54        });
 55        let mut cx = Context::from_waker(&waker);
 56        if let Poll::Ready(()) = future.as_mut().poll(&mut cx) {
 57            return true;
 58        }
 59
 60        let park_deadline = |deadline: Instant| {
 61            // Timer expirations are only delivered every ~15.6 milliseconds by default on Windows.
 62            // We increase the resolution during this wait so that short timeouts stay reasonably short.
 63            let _timer_guard = self.dispatcher.increase_timer_resolution();
 64            parker.park_deadline(deadline)
 65        };
 66
 67        loop {
 68            match deadline {
 69                Some(deadline) if !park_deadline(deadline) && deadline <= Instant::now() => {
 70                    return false;
 71                }
 72                Some(_) => (),
 73                None => parker.park(),
 74            }
 75            if let Poll::Ready(()) = future.as_mut().poll(&mut cx) {
 76                break true;
 77            }
 78        }
 79    }
 80
 81    fn schedule_foreground(&self, _session_id: SessionId, runnable: Runnable<RunnableMeta>) {
 82        self.dispatcher
 83            .dispatch_on_main_thread(runnable, Priority::default());
 84    }
 85
 86    fn schedule_background_with_priority(
 87        &self,
 88        runnable: Runnable<RunnableMeta>,
 89        priority: Priority,
 90    ) {
 91        self.dispatcher.dispatch(runnable, priority);
 92    }
 93
 94    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
 95        self.dispatcher.spawn_realtime(f);
 96    }
 97
 98    #[track_caller]
 99    fn timer(&self, duration: Duration) -> Timer {
100        use std::sync::{Arc, atomic::AtomicBool};
101
102        let (tx, rx) = oneshot::channel();
103        let dispatcher = self.dispatcher.clone();
104
105        // Create a runnable that will send the completion signal
106        let location = std::panic::Location::caller();
107        let closed = Arc::new(AtomicBool::new(false));
108        let (runnable, _task) = async_task::Builder::new()
109            .metadata(RunnableMeta { location, closed })
110            .spawn(
111                move |_| async move {
112                    let _ = tx.send(());
113                },
114                move |runnable| {
115                    dispatcher.dispatch_after(duration, runnable);
116                },
117            );
118        runnable.schedule();
119
120        Timer::new(rx)
121    }
122
123    fn clock(&self) -> Arc<dyn Clock> {
124        self.clock.clone()
125    }
126
127    fn as_test(&self) -> Option<&TestScheduler> {
128        None
129    }
130}
131
132/// A production clock that uses the platform dispatcher's time.
133struct PlatformClock {
134    dispatcher: Arc<dyn PlatformDispatcher>,
135}
136
137impl Clock for PlatformClock {
138    fn utc_now(&self) -> DateTime<Utc> {
139        Utc::now()
140    }
141
142    fn now(&self) -> Instant {
143        self.dispatcher.now()
144    }
145}