1use crate::{AppContext, PlatformDispatcher};
2use futures::channel::mpsc;
3use smol::prelude::*;
4use std::{
5 fmt::Debug,
6 marker::PhantomData,
7 mem,
8 num::NonZeroUsize,
9 pin::Pin,
10 rc::Rc,
11 sync::{
12 atomic::{AtomicUsize, Ordering::SeqCst},
13 Arc,
14 },
15 task::{Context, Poll},
16 time::Duration,
17};
18use util::TryFutureExt;
19use waker_fn::waker_fn;
20
21#[cfg(any(test, feature = "test-support"))]
22use rand::rngs::StdRng;
23
24/// A pointer to the executor that is currently running,
25/// for spawning background tasks.
26#[derive(Clone)]
27pub struct BackgroundExecutor {
28 dispatcher: Arc<dyn PlatformDispatcher>,
29}
30
31/// A pointer to the executor that is currently running,
32/// for spawning tasks on the main thread.
33#[derive(Clone)]
34pub struct ForegroundExecutor {
35 dispatcher: Arc<dyn PlatformDispatcher>,
36 not_send: PhantomData<Rc<()>>,
37}
38
39/// Task is a primitive that allows work to happen in the background.
40///
41/// It implements [`Future`] so you can `.await` on it.
42///
43/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
44/// the task to continue running, but with no way to return a value.
45#[must_use]
46#[derive(Debug)]
47pub enum Task<T> {
48 /// A task that is ready to return a value
49 Ready(Option<T>),
50
51 /// A task that is currently running.
52 Spawned(async_task::Task<T>),
53}
54
55impl<T> Task<T> {
56 /// Creates a new task that will resolve with the value
57 pub fn ready(val: T) -> Self {
58 Task::Ready(Some(val))
59 }
60
61 /// Detaching a task runs it to completion in the background
62 pub fn detach(self) {
63 match self {
64 Task::Ready(_) => {}
65 Task::Spawned(task) => task.detach(),
66 }
67 }
68}
69
70impl<E, T> Task<Result<T, E>>
71where
72 T: 'static,
73 E: 'static + Debug,
74{
75 /// Run the task to completion in the background and log any
76 /// errors that occur.
77 #[track_caller]
78 pub fn detach_and_log_err(self, cx: &AppContext) {
79 let location = core::panic::Location::caller();
80 cx.foreground_executor()
81 .spawn(self.log_tracked_err(*location))
82 .detach();
83 }
84}
85
86impl<T> Future for Task<T> {
87 type Output = T;
88
89 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
90 match unsafe { self.get_unchecked_mut() } {
91 Task::Ready(val) => Poll::Ready(val.take().unwrap()),
92 Task::Spawned(task) => task.poll(cx),
93 }
94 }
95}
96
97/// A task label is an opaque identifier that you can use to
98/// refer to a task in tests.
99#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
100pub struct TaskLabel(NonZeroUsize);
101
102impl Default for TaskLabel {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108impl TaskLabel {
109 /// Construct a new task label.
110 pub fn new() -> Self {
111 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
112 Self(NEXT_TASK_LABEL.fetch_add(1, SeqCst).try_into().unwrap())
113 }
114}
115
116type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
117
118type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
119
120/// BackgroundExecutor lets you run things on background threads.
121/// In production this is a thread pool with no ordering guarantees.
122/// In tests this is simulated by running tasks one by one in a deterministic
123/// (but arbitrary) order controlled by the `SEED` environment variable.
124impl BackgroundExecutor {
125 #[doc(hidden)]
126 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
127 Self { dispatcher }
128 }
129
130 /// Enqueues the given future to be run to completion on a background thread.
131 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
132 where
133 R: Send + 'static,
134 {
135 self.spawn_internal::<R>(Box::pin(future), None)
136 }
137
138 /// Enqueues the given future to be run to completion on a background thread.
139 /// The given label can be used to control the priority of the task in tests.
140 pub fn spawn_labeled<R>(
141 &self,
142 label: TaskLabel,
143 future: impl Future<Output = R> + Send + 'static,
144 ) -> Task<R>
145 where
146 R: Send + 'static,
147 {
148 self.spawn_internal::<R>(Box::pin(future), Some(label))
149 }
150
151 fn spawn_internal<R: Send + 'static>(
152 &self,
153 future: AnyFuture<R>,
154 label: Option<TaskLabel>,
155 ) -> Task<R> {
156 let dispatcher = self.dispatcher.clone();
157 let (runnable, task) =
158 async_task::spawn(future, move |runnable| dispatcher.dispatch(runnable, label));
159 runnable.schedule();
160 Task::Spawned(task)
161 }
162
163 /// Used by the test harness to run an async test in a synchronous fashion.
164 #[cfg(any(test, feature = "test-support"))]
165 #[track_caller]
166 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
167 if let Ok(value) = self.block_internal(false, future, None) {
168 value
169 } else {
170 unreachable!()
171 }
172 }
173
174 /// Block the current thread until the given future resolves.
175 /// Consider using `block_with_timeout` instead.
176 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
177 if let Ok(value) = self.block_internal(true, future, None) {
178 value
179 } else {
180 unreachable!()
181 }
182 }
183
184 #[cfg(not(any(test, feature = "test-support")))]
185 pub(crate) fn block_internal<R>(
186 &self,
187 _background_only: bool,
188 future: impl Future<Output = R>,
189 timeout: Option<Duration>,
190 ) -> Result<R, impl Future<Output = R>> {
191 use std::time::Instant;
192
193 let mut future = Box::pin(future);
194 if timeout == Some(Duration::ZERO) {
195 return Err(future);
196 }
197 let deadline = timeout.map(|timeout| Instant::now() + timeout);
198
199 let unparker = self.dispatcher.unparker();
200 let waker = waker_fn(move || {
201 unparker.unpark();
202 });
203 let mut cx = std::task::Context::from_waker(&waker);
204
205 loop {
206 match future.as_mut().poll(&mut cx) {
207 Poll::Ready(result) => return Ok(result),
208 Poll::Pending => {
209 let timeout =
210 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
211 if !self.dispatcher.park(timeout) {
212 if deadline.is_some_and(|deadline| deadline < Instant::now()) {
213 return Err(future);
214 }
215 }
216 }
217 }
218 }
219 }
220
221 #[cfg(any(test, feature = "test-support"))]
222 #[track_caller]
223 pub(crate) fn block_internal<R>(
224 &self,
225 background_only: bool,
226 future: impl Future<Output = R>,
227 timeout: Option<Duration>,
228 ) -> Result<R, impl Future<Output = R>> {
229 use std::sync::atomic::AtomicBool;
230
231 let mut future = Box::pin(future);
232 if timeout == Some(Duration::ZERO) {
233 return Err(future);
234 }
235 let Some(dispatcher) = self.dispatcher.as_test() else {
236 return Err(future);
237 };
238
239 let mut max_ticks = if timeout.is_some() {
240 dispatcher.gen_block_on_ticks()
241 } else {
242 usize::MAX
243 };
244 let unparker = self.dispatcher.unparker();
245 let awoken = Arc::new(AtomicBool::new(false));
246 let waker = waker_fn({
247 let awoken = awoken.clone();
248 move || {
249 awoken.store(true, SeqCst);
250 unparker.unpark();
251 }
252 });
253 let mut cx = std::task::Context::from_waker(&waker);
254
255 loop {
256 match future.as_mut().poll(&mut cx) {
257 Poll::Ready(result) => return Ok(result),
258 Poll::Pending => {
259 if max_ticks == 0 {
260 return Err(future);
261 }
262 max_ticks -= 1;
263
264 if !dispatcher.tick(background_only) {
265 if awoken.swap(false, SeqCst) {
266 continue;
267 }
268
269 if !dispatcher.parking_allowed() {
270 let mut backtrace_message = String::new();
271 let mut waiting_message = String::new();
272 if let Some(backtrace) = dispatcher.waiting_backtrace() {
273 backtrace_message =
274 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
275 }
276 if let Some(waiting_hint) = dispatcher.waiting_hint() {
277 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
278 }
279 panic!(
280 "parked with nothing left to run{waiting_message}{backtrace_message}",
281 )
282 }
283 self.dispatcher.park(None);
284 }
285 }
286 }
287 }
288 }
289
290 /// Block the current thread until the given future resolves
291 /// or `duration` has elapsed.
292 pub fn block_with_timeout<R>(
293 &self,
294 duration: Duration,
295 future: impl Future<Output = R>,
296 ) -> Result<R, impl Future<Output = R>> {
297 self.block_internal(true, future, Some(duration))
298 }
299
300 /// Scoped lets you start a number of tasks and waits
301 /// for all of them to complete before returning.
302 pub async fn scoped<'scope, F>(&self, scheduler: F)
303 where
304 F: FnOnce(&mut Scope<'scope>),
305 {
306 let mut scope = Scope::new(self.clone());
307 (scheduler)(&mut scope);
308 let spawned = mem::take(&mut scope.futures)
309 .into_iter()
310 .map(|f| self.spawn(f))
311 .collect::<Vec<_>>();
312 for task in spawned {
313 task.await;
314 }
315 }
316
317 /// Returns a task that will complete after the given duration.
318 /// Depending on other concurrent tasks the elapsed duration may be longer
319 /// than requested.
320 pub fn timer(&self, duration: Duration) -> Task<()> {
321 let (runnable, task) = async_task::spawn(async move {}, {
322 let dispatcher = self.dispatcher.clone();
323 move |runnable| dispatcher.dispatch_after(duration, runnable)
324 });
325 runnable.schedule();
326 Task::Spawned(task)
327 }
328
329 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
330 #[cfg(any(test, feature = "test-support"))]
331 pub fn start_waiting(&self) {
332 self.dispatcher.as_test().unwrap().start_waiting();
333 }
334
335 /// in tests, removes the debugging data added by start_waiting
336 #[cfg(any(test, feature = "test-support"))]
337 pub fn finish_waiting(&self) {
338 self.dispatcher.as_test().unwrap().finish_waiting();
339 }
340
341 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
342 #[cfg(any(test, feature = "test-support"))]
343 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> {
344 self.dispatcher.as_test().unwrap().simulate_random_delay()
345 }
346
347 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
348 #[cfg(any(test, feature = "test-support"))]
349 pub fn deprioritize(&self, task_label: TaskLabel) {
350 self.dispatcher.as_test().unwrap().deprioritize(task_label)
351 }
352
353 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
354 #[cfg(any(test, feature = "test-support"))]
355 pub fn advance_clock(&self, duration: Duration) {
356 self.dispatcher.as_test().unwrap().advance_clock(duration)
357 }
358
359 /// in tests, run one task.
360 #[cfg(any(test, feature = "test-support"))]
361 pub fn tick(&self) -> bool {
362 self.dispatcher.as_test().unwrap().tick(false)
363 }
364
365 /// in tests, run all tasks that are ready to run. If after doing so
366 /// the test still has outstanding tasks, this will panic. (See also `allow_parking`)
367 #[cfg(any(test, feature = "test-support"))]
368 pub fn run_until_parked(&self) {
369 self.dispatcher.as_test().unwrap().run_until_parked()
370 }
371
372 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
373 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
374 /// do take real async time to run.
375 #[cfg(any(test, feature = "test-support"))]
376 pub fn allow_parking(&self) {
377 self.dispatcher.as_test().unwrap().allow_parking();
378 }
379
380 /// undoes the effect of [`allow_parking`].
381 #[cfg(any(test, feature = "test-support"))]
382 pub fn forbid_parking(&self) {
383 self.dispatcher.as_test().unwrap().forbid_parking();
384 }
385
386 /// adds detail to the "parked with nothing let to run" message.
387 #[cfg(any(test, feature = "test-support"))]
388 pub fn set_waiting_hint(&self, msg: Option<String>) {
389 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
390 }
391
392 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
393 #[cfg(any(test, feature = "test-support"))]
394 pub fn rng(&self) -> StdRng {
395 self.dispatcher.as_test().unwrap().rng()
396 }
397
398 /// How many CPUs are available to the dispatcher.
399 pub fn num_cpus(&self) -> usize {
400 num_cpus::get()
401 }
402
403 /// Whether we're on the main thread.
404 pub fn is_main_thread(&self) -> bool {
405 self.dispatcher.is_main_thread()
406 }
407
408 #[cfg(any(test, feature = "test-support"))]
409 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
410 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
411 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
412 }
413}
414
415/// ForegroundExecutor runs things on the main thread.
416impl ForegroundExecutor {
417 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
418 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
419 Self {
420 dispatcher,
421 not_send: PhantomData,
422 }
423 }
424
425 /// Enqueues the given Task to run on the main thread at some point in the future.
426 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
427 where
428 R: 'static,
429 {
430 let dispatcher = self.dispatcher.clone();
431 fn inner<R: 'static>(
432 dispatcher: Arc<dyn PlatformDispatcher>,
433 future: AnyLocalFuture<R>,
434 ) -> Task<R> {
435 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
436 dispatcher.dispatch_on_main_thread(runnable)
437 });
438 runnable.schedule();
439 Task::Spawned(task)
440 }
441 inner::<R>(dispatcher, Box::pin(future))
442 }
443}
444
445/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
446pub struct Scope<'a> {
447 executor: BackgroundExecutor,
448 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
449 tx: Option<mpsc::Sender<()>>,
450 rx: mpsc::Receiver<()>,
451 lifetime: PhantomData<&'a ()>,
452}
453
454impl<'a> Scope<'a> {
455 fn new(executor: BackgroundExecutor) -> Self {
456 let (tx, rx) = mpsc::channel(1);
457 Self {
458 executor,
459 tx: Some(tx),
460 rx,
461 futures: Default::default(),
462 lifetime: PhantomData,
463 }
464 }
465
466 /// How many CPUs are available to the dispatcher.
467 pub fn num_cpus(&self) -> usize {
468 self.executor.num_cpus()
469 }
470
471 /// Spawn a future into this scope.
472 pub fn spawn<F>(&mut self, f: F)
473 where
474 F: Future<Output = ()> + Send + 'a,
475 {
476 let tx = self.tx.clone().unwrap();
477
478 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
479 // dropping this `Scope` blocks until all of the futures have resolved.
480 let f = unsafe {
481 mem::transmute::<
482 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
483 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
484 >(Box::pin(async move {
485 f.await;
486 drop(tx);
487 }))
488 };
489 self.futures.push(f);
490 }
491}
492
493impl<'a> Drop for Scope<'a> {
494 fn drop(&mut self) {
495 self.tx.take().unwrap();
496
497 // Wait until the channel is closed, which means that all of the spawned
498 // futures have resolved.
499 self.executor.block(self.rx.next());
500 }
501}