1use crate::{PlatformDispatcher, RunnableMeta};
2use async_task::Runnable;
3use chrono::{DateTime, Utc};
4use futures::channel::oneshot;
5use scheduler::{Clock, Priority, Scheduler, SessionId, TestScheduler, Timer};
6use std::{
7 future::Future,
8 pin::Pin,
9 sync::{
10 Arc,
11 atomic::{AtomicU16, Ordering},
12 },
13 task::{Context, Poll},
14 time::{Duration, Instant},
15};
16use waker_fn::waker_fn;
17
18/// A production implementation of [`Scheduler`] that wraps a [`PlatformDispatcher`].
19///
20/// This allows GPUI to use the scheduler crate's executor types with the platform's
21/// native dispatch mechanisms (e.g., Grand Central Dispatch on macOS).
22pub struct PlatformScheduler {
23 dispatcher: Arc<dyn PlatformDispatcher>,
24 clock: Arc<PlatformClock>,
25 next_session_id: AtomicU16,
26}
27
28impl PlatformScheduler {
29 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
30 Self {
31 dispatcher: dispatcher.clone(),
32 clock: Arc::new(PlatformClock { dispatcher }),
33 next_session_id: AtomicU16::new(0),
34 }
35 }
36
37 pub fn allocate_session_id(&self) -> SessionId {
38 SessionId::new(self.next_session_id.fetch_add(1, Ordering::SeqCst))
39 }
40}
41
42impl Scheduler for PlatformScheduler {
43 fn block(
44 &self,
45 _session_id: Option<SessionId>,
46 mut future: Pin<&mut dyn Future<Output = ()>>,
47 timeout: Option<Duration>,
48 ) -> bool {
49 let deadline = timeout.map(|t| Instant::now() + t);
50 let parker = parking::Parker::new();
51 let unparker = parker.unparker();
52 let waker = waker_fn(move || {
53 unparker.unpark();
54 });
55 let mut cx = Context::from_waker(&waker);
56 if let Poll::Ready(()) = future.as_mut().poll(&mut cx) {
57 return true;
58 }
59
60 let park_deadline = |deadline: Instant| {
61 // Timer expirations are only delivered every ~15.6 milliseconds by default on Windows.
62 // We increase the resolution during this wait so that short timeouts stay reasonably short.
63 let _timer_guard = self.dispatcher.increase_timer_resolution();
64 parker.park_deadline(deadline)
65 };
66
67 loop {
68 match deadline {
69 Some(deadline) if !park_deadline(deadline) && deadline <= Instant::now() => {
70 return false;
71 }
72 Some(_) => (),
73 None => parker.park(),
74 }
75 if let Poll::Ready(()) = future.as_mut().poll(&mut cx) {
76 break true;
77 }
78 }
79 }
80
81 fn schedule_foreground(&self, _session_id: SessionId, runnable: Runnable<RunnableMeta>) {
82 self.dispatcher
83 .dispatch_on_main_thread(runnable, Priority::default());
84 }
85
86 fn schedule_background_with_priority(
87 &self,
88 runnable: Runnable<RunnableMeta>,
89 priority: Priority,
90 ) {
91 self.dispatcher.dispatch(runnable, priority);
92 }
93
94 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
95 self.dispatcher.spawn_realtime(f);
96 }
97
98 fn timer(&self, duration: Duration) -> Timer {
99 use std::sync::{Arc, atomic::AtomicBool};
100
101 let (tx, rx) = oneshot::channel();
102 let dispatcher = self.dispatcher.clone();
103
104 // Create a runnable that will send the completion signal
105 let location = std::panic::Location::caller();
106 let closed = Arc::new(AtomicBool::new(false));
107 let (runnable, _task) = async_task::Builder::new()
108 .metadata(RunnableMeta { location, closed })
109 .spawn(
110 move |_| async move {
111 let _ = tx.send(());
112 },
113 move |runnable| {
114 dispatcher.dispatch_after(duration, runnable);
115 },
116 );
117 runnable.schedule();
118
119 Timer::new(rx)
120 }
121
122 fn clock(&self) -> Arc<dyn Clock> {
123 self.clock.clone()
124 }
125
126 fn as_test(&self) -> Option<&TestScheduler> {
127 None
128 }
129}
130
131/// A production clock that uses the platform dispatcher's time.
132struct PlatformClock {
133 dispatcher: Arc<dyn PlatformDispatcher>,
134}
135
136impl Clock for PlatformClock {
137 fn utc_now(&self) -> DateTime<Utc> {
138 Utc::now()
139 }
140
141 fn now(&self) -> Instant {
142 self.dispatcher.now()
143 }
144}