1use crate::App;
2use futures::channel::mpsc;
3use scheduler::Timer;
4use smol::prelude::*;
5use std::sync::Arc;
6use std::{
7 fmt::Debug,
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 task::{Context, Poll},
12 time::{Duration, Instant},
13};
14use util::TryFutureExt;
15
16pub use scheduler::{Scheduler, Yield};
17
18#[cfg(any(test, feature = "test-support"))]
19use rand::rngs::StdRng;
20
21/// A pointer to the executor that is currently running,
22/// for spawning background tasks.
23#[derive(Clone)]
24pub struct BackgroundExecutor(scheduler::BackgroundExecutor);
25
26/// A pointer to the executor that is currently running,
27/// for spawning tasks on the main thread.
28///
29/// This is intentionally `!Send` via the `not_send` marker field. This is because
30/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
31/// only polled from the same thread it was spawned from. These checks would fail when spawning
32/// foreground tasks from from background threads.
33#[derive(Clone)]
34pub struct ForegroundExecutor(scheduler::ForegroundExecutor);
35
36/// Task is a primitive that allows work to happen in the background.
37///
38/// It implements [`Future`] so you can `.await` on it.
39///
40/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
41/// the task to continue running, but with no way to return a value.
42#[must_use]
43#[derive(Debug)]
44pub struct Task<T>(scheduler::Task<T>);
45
46impl<T> Task<T> {
47 /// Creates a new task that will resolve with the value
48 pub fn ready(val: T) -> Self {
49 Task(scheduler::Task::ready(val))
50 }
51
52 /// Detaching a task runs it to completion in the background
53 pub fn detach(self) {
54 self.0.detach()
55 }
56}
57
58impl<E, T> Task<Result<T, E>>
59where
60 T: 'static,
61 E: 'static + Debug,
62{
63 /// Run the task to completion in the background and log any
64 /// errors that occur.
65 #[track_caller]
66 pub fn detach_and_log_err(self, cx: &App) {
67 let location = core::panic::Location::caller();
68 cx.foreground_executor()
69 .spawn(self.log_tracked_err(*location))
70 .detach();
71 }
72}
73
74impl<T> Future for Task<T> {
75 type Output = T;
76
77 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
78 unsafe { self.map_unchecked_mut(|t| &mut t.0) }.poll(cx)
79 }
80}
81
82type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
83
84type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
85
86/// BackgroundExecutor lets you run things on background threads.
87/// In production this is a thread pool with no ordering guarantees.
88/// In tests this is simulated by running tasks one by one in a deterministic
89/// (but arbitrary) order controlled by the `SEED` environment variable.
90impl BackgroundExecutor {
91 /// Create a new BackgroundExecutor
92 pub fn new(executor: scheduler::BackgroundExecutor) -> Self {
93 Self(executor)
94 }
95
96 /// Enqueues the given future to be run to completion on a background thread.
97 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
98 where
99 R: Send + 'static,
100 {
101 fn inner<R: Send + 'static>(
102 executor: &scheduler::BackgroundExecutor,
103 future: AnyFuture<R>,
104 ) -> Task<R> {
105 Task(executor.spawn(future))
106 }
107
108 inner(&self.0, Box::pin(future))
109 }
110
111 /// Scoped lets you start a number of tasks and waits
112 /// for all of them to complete before returning.
113 pub async fn scoped<'scope, F>(&self, f: F)
114 where
115 F: FnOnce(&mut Scope<'scope>),
116 {
117 let mut scope = Scope::new(self.clone());
118 (f)(&mut scope);
119 let spawned = mem::take(&mut scope.futures)
120 .into_iter()
121 .map(|f| self.spawn(f))
122 .collect::<Vec<_>>();
123 for task in spawned {
124 task.await;
125 }
126 }
127
128 /// Get the current time.
129 ///
130 /// Calling this instead of `std::time::Instant::now` allows the use
131 /// of fake timers in tests.
132 pub fn now(&self) -> Instant {
133 self.scheduler().clock().now()
134 }
135
136 /// Returns a task that will complete after the given duration.
137 /// Depending on other concurrent tasks the elapsed duration may be longer
138 /// than requested.
139 pub fn timer(&self, duration: Duration) -> Timer {
140 self.0.timer(duration)
141 }
142
143 /// Get the underlying scheduler.
144 pub fn scheduler(&self) -> &Arc<dyn Scheduler> {
145 &self.0.scheduler()
146 }
147
148 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
149 #[cfg(any(test, feature = "test-support"))]
150 pub fn simulate_random_delay(&self) -> Yield {
151 self.scheduler().as_test().yield_random()
152 }
153
154 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
155 #[cfg(any(test, feature = "test-support"))]
156 pub fn advance_clock(&self, duration: Duration) {
157 self.scheduler().as_test().advance_clock(duration);
158 }
159
160 /// in tests, run all tasks that are ready to run. If after doing so
161 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
162 #[cfg(any(test, feature = "test-support"))]
163 pub fn run_until_parked(&self) {
164 self.scheduler().as_test().run();
165 }
166
167 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
168 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
169 /// do take real async time to run.
170 #[cfg(any(test, feature = "test-support"))]
171 pub fn allow_parking(&self) {
172 self.scheduler().as_test().allow_parking();
173 }
174
175 /// undoes the effect of [`Self::allow_parking`].
176 #[cfg(any(test, feature = "test-support"))]
177 pub fn forbid_parking(&self) {
178 self.scheduler().as_test().forbid_parking();
179 }
180
181 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
182 #[cfg(any(test, feature = "test-support"))]
183 pub fn rng(&self) -> StdRng {
184 self.scheduler().as_test().rng().lock().clone()
185 }
186
187 /// How many CPUs are available for this executor.
188 pub fn num_cpus(&self) -> usize {
189 #[cfg(any(test, feature = "test-support"))]
190 return 4;
191
192 #[cfg(not(any(test, feature = "test-support")))]
193 return num_cpus::get();
194 }
195
196 #[cfg(any(test, feature = "test-support"))]
197 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
198 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
199 self.scheduler().as_test().set_timeout_ticks(range)
200 }
201}
202
203/// ForegroundExecutor runs things on the main thread.
204impl ForegroundExecutor {
205 /// Create a new ForegroundExecutor
206 pub fn new(executor: scheduler::ForegroundExecutor) -> Self {
207 Self(executor)
208 }
209
210 /// Enqueues the given Task to run on the main thread at some point in the future.
211 #[track_caller]
212 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
213 where
214 R: 'static,
215 {
216 #[track_caller]
217 fn inner<R: 'static>(
218 executor: &scheduler::ForegroundExecutor,
219 future: AnyLocalFuture<R>,
220 ) -> Task<R> {
221 Task(executor.spawn(future))
222 }
223
224 inner::<R>(&self.0, Box::pin(future))
225 }
226
227 /// Block the current thread until the given future resolves.
228 /// Consider using `block_with_timeout` instead.
229 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
230 self.0.block_on(future)
231 }
232
233 /// Block the current thread until the given future resolves
234 /// or `timeout` has elapsed.
235 pub fn block_with_timeout<Fut: Unpin + Future>(
236 &self,
237 timeout: Duration,
238 future: Fut,
239 ) -> Result<Fut::Output, Fut> {
240 self.0.block_with_timeout(timeout, future)
241 }
242}
243
244/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
245pub struct Scope<'a> {
246 executor: BackgroundExecutor,
247 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
248 tx: Option<mpsc::Sender<()>>,
249 rx: mpsc::Receiver<()>,
250 lifetime: PhantomData<&'a ()>,
251}
252
253impl<'a> Scope<'a> {
254 fn new(executor: BackgroundExecutor) -> Self {
255 let (tx, rx) = mpsc::channel(1);
256 Self {
257 executor,
258 tx: Some(tx),
259 rx,
260 futures: Default::default(),
261 lifetime: PhantomData,
262 }
263 }
264
265 /// How many CPUs are available to the dispatcher.
266 pub fn num_cpus(&self) -> usize {
267 self.executor.num_cpus()
268 }
269
270 /// Spawn a future into this scope.
271 pub fn spawn<F>(&mut self, f: F)
272 where
273 F: Future<Output = ()> + Send + 'a,
274 {
275 let tx = self.tx.clone().unwrap();
276
277 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
278 // dropping this `Scope` blocks until all of the futures have resolved.
279 let f = unsafe {
280 mem::transmute::<
281 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
282 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
283 >(Box::pin(async move {
284 f.await;
285 drop(tx);
286 }))
287 };
288 self.futures.push(f);
289 }
290}
291
292impl Drop for Scope<'_> {
293 fn drop(&mut self) {
294 self.tx.take().unwrap();
295
296 // Wait until the channel is closed, which means that all of the spawned
297 // futures have resolved.
298 self.executor.scheduler().block(
299 None,
300 async {
301 self.rx.next().await;
302 }
303 .boxed(),
304 None,
305 );
306 }
307}