1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use parking_lot::{Condvar, Mutex};
5use smol::prelude::*;
6use std::{
7 fmt::Debug,
8 marker::PhantomData,
9 mem::{self, ManuallyDrop},
10 num::NonZeroUsize,
11 panic::Location,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 Arc,
16 atomic::{AtomicUsize, Ordering},
17 },
18 task::{Context, Poll},
19 thread::{self, ThreadId},
20 time::{Duration, Instant},
21};
22use util::TryFutureExt;
23use waker_fn::waker_fn;
24
25#[cfg(any(test, feature = "test-support"))]
26use rand::rngs::StdRng;
27
28/// A pointer to the executor that is currently running,
29/// for spawning background tasks.
30#[derive(Clone)]
31pub struct BackgroundExecutor {
32 #[doc(hidden)]
33 pub dispatcher: Arc<dyn PlatformDispatcher>,
34}
35
36/// A pointer to the executor that is currently running,
37/// for spawning tasks on the main thread.
38///
39/// This is intentionally `!Send` via the `not_send` marker field. This is because
40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
41/// only polled from the same thread it was spawned from. These checks would fail when spawning
42/// foreground tasks from background threads.
43#[derive(Clone)]
44pub struct ForegroundExecutor {
45 #[doc(hidden)]
46 pub dispatcher: Arc<dyn PlatformDispatcher>,
47 not_send: PhantomData<Rc<()>>,
48}
49
50/// Realtime task priority
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52#[repr(u8)]
53pub enum RealtimePriority {
54 /// Audio task
55 Audio,
56 /// Other realtime task
57 #[default]
58 Other,
59}
60
61/// Task priority
62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
63#[repr(u8)]
64pub enum Priority {
65 /// Realtime priority
66 ///
67 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
68 Realtime(RealtimePriority),
69 /// High priority
70 ///
71 /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
72 High,
73 /// Medium priority, probably suits most of your use cases.
74 #[default]
75 Medium,
76 /// Low priority
77 ///
78 /// Prioritize this for background work that can come in large quantities
79 /// to not starve the executor of resources for high priority tasks
80 Low,
81}
82
83impl Priority {
84 #[allow(dead_code)]
85 pub(crate) const fn probability(&self) -> u32 {
86 match self {
87 // realtime priorities are not considered for probability scheduling
88 Priority::Realtime(_) => 0,
89 Priority::High => 60,
90 Priority::Medium => 30,
91 Priority::Low => 10,
92 }
93 }
94}
95
96/// Task is a primitive that allows work to happen in the background.
97///
98/// It implements [`Future`] so you can `.await` on it.
99///
100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
101/// the task to continue running, but with no way to return a value.
102#[must_use]
103#[derive(Debug)]
104pub struct Task<T>(TaskState<T>);
105
106#[derive(Debug)]
107enum TaskState<T> {
108 /// A task that is ready to return a value
109 Ready(Option<T>),
110
111 /// A task that is currently running.
112 Spawned(async_task::Task<T, RunnableMeta>),
113}
114
115impl<T> Task<T> {
116 /// Creates a new task that will resolve with the value
117 pub fn ready(val: T) -> Self {
118 Task(TaskState::Ready(Some(val)))
119 }
120
121 /// Detaching a task runs it to completion in the background
122 pub fn detach(self) {
123 match self {
124 Task(TaskState::Ready(_)) => {}
125 Task(TaskState::Spawned(task)) => task.detach(),
126 }
127 }
128
129 /// Converts this task into a fallible task that returns `Option<T>`.
130 ///
131 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
132 /// if the app was dropped while the task is executing.
133 ///
134 /// # Example
135 ///
136 /// ```ignore
137 /// // Background task that gracefully handles app shutdown:
138 /// cx.background_spawn(async move {
139 /// let result = foreground_task.fallible().await;
140 /// if let Some(value) = result {
141 /// // Process the value
142 /// }
143 /// // If None, app was shut down - just exit gracefully
144 /// }).detach();
145 /// ```
146 pub fn fallible(self) -> FallibleTask<T> {
147 FallibleTask(match self.0 {
148 TaskState::Ready(val) => FallibleTaskState::Ready(val),
149 TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
150 })
151 }
152}
153
154impl<E, T> Task<Result<T, E>>
155where
156 T: 'static,
157 E: 'static + Debug,
158{
159 /// Run the task to completion in the background and log any
160 /// errors that occur.
161 #[track_caller]
162 pub fn detach_and_log_err(self, cx: &App) {
163 let location = core::panic::Location::caller();
164 cx.foreground_executor()
165 .spawn(self.log_tracked_err(*location))
166 .detach();
167 }
168}
169
170impl<T> Future for Task<T> {
171 type Output = T;
172
173 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
174 match unsafe { self.get_unchecked_mut() } {
175 Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
176 Task(TaskState::Spawned(task)) => task.poll(cx),
177 }
178 }
179}
180
181/// A task that returns `Option<T>` instead of panicking when cancelled.
182#[must_use]
183pub struct FallibleTask<T>(FallibleTaskState<T>);
184
185enum FallibleTaskState<T> {
186 /// A task that is ready to return a value
187 Ready(Option<T>),
188
189 /// A task that is currently running (wraps async_task::FallibleTask).
190 Spawned(async_task::FallibleTask<T, RunnableMeta>),
191}
192
193impl<T> FallibleTask<T> {
194 /// Creates a new fallible task that will resolve with the value.
195 pub fn ready(val: T) -> Self {
196 FallibleTask(FallibleTaskState::Ready(Some(val)))
197 }
198
199 /// Detaching a task runs it to completion in the background.
200 pub fn detach(self) {
201 match self.0 {
202 FallibleTaskState::Ready(_) => {}
203 FallibleTaskState::Spawned(task) => task.detach(),
204 }
205 }
206}
207
208impl<T> Future for FallibleTask<T> {
209 type Output = Option<T>;
210
211 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
212 match unsafe { self.get_unchecked_mut() } {
213 FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
214 FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
215 }
216 }
217}
218
219impl<T> std::fmt::Debug for FallibleTask<T> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match &self.0 {
222 FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
223 FallibleTaskState::Spawned(task) => {
224 f.debug_tuple("FallibleTask::Spawned").field(task).finish()
225 }
226 }
227 }
228}
229
230/// A task label is an opaque identifier that you can use to
231/// refer to a task in tests.
232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
233pub struct TaskLabel(NonZeroUsize);
234
235impl Default for TaskLabel {
236 fn default() -> Self {
237 Self::new()
238 }
239}
240
241impl TaskLabel {
242 /// Construct a new task label.
243 pub fn new() -> Self {
244 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
245 Self(
246 NEXT_TASK_LABEL
247 .fetch_add(1, Ordering::SeqCst)
248 .try_into()
249 .unwrap(),
250 )
251 }
252}
253
254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
255
256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
257
258/// BackgroundExecutor lets you run things on background threads.
259/// In production this is a thread pool with no ordering guarantees.
260/// In tests this is simulated by running tasks one by one in a deterministic
261/// (but arbitrary) order controlled by the `SEED` environment variable.
262impl BackgroundExecutor {
263 #[doc(hidden)]
264 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
265 Self { dispatcher }
266 }
267
268 /// Enqueues the given future to be run to completion on a background thread.
269 #[track_caller]
270 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
271 where
272 R: Send + 'static,
273 {
274 self.spawn_with_priority(Priority::default(), future)
275 }
276
277 /// Enqueues the given future to be run to completion on a background thread.
278 #[track_caller]
279 pub fn spawn_with_priority<R>(
280 &self,
281 priority: Priority,
282 future: impl Future<Output = R> + Send + 'static,
283 ) -> Task<R>
284 where
285 R: Send + 'static,
286 {
287 self.spawn_internal::<R>(Box::pin(future), None, priority)
288 }
289
290 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
291 ///
292 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
293 /// completion before the current task is resumed, even if the current task is slated for cancellation.
294 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
295 where
296 R: Send,
297 {
298 // We need to ensure that cancellation of the parent task does not drop the environment
299 // before the our own task has completed or got cancelled.
300 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
301
302 impl Drop for NotifyOnDrop<'_> {
303 fn drop(&mut self) {
304 *self.0.1.lock() = true;
305 self.0.0.notify_all();
306 }
307 }
308
309 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
310
311 impl Drop for WaitOnDrop<'_> {
312 fn drop(&mut self) {
313 let mut done = self.0.1.lock();
314 if !*done {
315 self.0.0.wait(&mut done);
316 }
317 }
318 }
319
320 let dispatcher = self.dispatcher.clone();
321 let location = core::panic::Location::caller();
322
323 let pair = &(Condvar::new(), Mutex::new(false));
324 let _wait_guard = WaitOnDrop(pair);
325
326 let (runnable, task) = unsafe {
327 async_task::Builder::new()
328 .metadata(RunnableMeta {
329 location,
330 app: None,
331 })
332 .spawn_unchecked(
333 move |_| async {
334 let _notify_guard = NotifyOnDrop(pair);
335 future.await
336 },
337 move |runnable| {
338 dispatcher.dispatch(
339 RunnableVariant::Meta(runnable),
340 None,
341 Priority::default(),
342 )
343 },
344 )
345 };
346 runnable.schedule();
347 task.await
348 }
349
350 /// Enqueues the given future to be run to completion on a background thread.
351 /// The given label can be used to control the priority of the task in tests.
352 #[track_caller]
353 pub fn spawn_labeled<R>(
354 &self,
355 label: TaskLabel,
356 future: impl Future<Output = R> + Send + 'static,
357 ) -> Task<R>
358 where
359 R: Send + 'static,
360 {
361 self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
362 }
363
364 #[track_caller]
365 fn spawn_internal<R: Send + 'static>(
366 &self,
367 future: AnyFuture<R>,
368 label: Option<TaskLabel>,
369 priority: Priority,
370 ) -> Task<R> {
371 let dispatcher = self.dispatcher.clone();
372 let (runnable, task) = if let Priority::Realtime(realtime) = priority {
373 let location = core::panic::Location::caller();
374 let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
375
376 dispatcher.spawn_realtime(
377 realtime,
378 Box::new(move || {
379 while let Ok(runnable) = rx.recv() {
380 let start = Instant::now();
381 let location = runnable.metadata().location;
382 let mut timing = TaskTiming {
383 location,
384 start,
385 end: None,
386 };
387 profiler::add_task_timing(timing);
388
389 runnable.run();
390
391 let end = Instant::now();
392 timing.end = Some(end);
393 profiler::add_task_timing(timing);
394 }
395 }),
396 );
397
398 async_task::Builder::new()
399 .metadata(RunnableMeta {
400 location,
401 app: None,
402 })
403 .spawn(
404 move |_| future,
405 move |runnable| {
406 let _ = tx.send(runnable);
407 },
408 )
409 } else {
410 let location = core::panic::Location::caller();
411 async_task::Builder::new()
412 .metadata(RunnableMeta {
413 location,
414 app: None,
415 })
416 .spawn(
417 move |_| future,
418 move |runnable| {
419 dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
420 },
421 )
422 };
423
424 runnable.schedule();
425 Task(TaskState::Spawned(task))
426 }
427
428 /// Used by the test harness to run an async test in a synchronous fashion.
429 #[cfg(any(test, feature = "test-support"))]
430 #[track_caller]
431 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
432 if let Ok(value) = self.block_internal(false, future, None) {
433 value
434 } else {
435 unreachable!()
436 }
437 }
438
439 /// Block the current thread until the given future resolves.
440 /// Consider using `block_with_timeout` instead.
441 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
442 if let Ok(value) = self.block_internal(true, future, None) {
443 value
444 } else {
445 unreachable!()
446 }
447 }
448
449 #[cfg(not(any(test, feature = "test-support")))]
450 pub(crate) fn block_internal<Fut: Future>(
451 &self,
452 _background_only: bool,
453 future: Fut,
454 timeout: Option<Duration>,
455 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
456 use std::time::Instant;
457
458 let mut future = Box::pin(future);
459 if timeout == Some(Duration::ZERO) {
460 return Err(future);
461 }
462 let deadline = timeout.map(|timeout| Instant::now() + timeout);
463
464 let parker = parking::Parker::new();
465 let unparker = parker.unparker();
466 let waker = waker_fn(move || {
467 unparker.unpark();
468 });
469 let mut cx = std::task::Context::from_waker(&waker);
470
471 loop {
472 match future.as_mut().poll(&mut cx) {
473 Poll::Ready(result) => return Ok(result),
474 Poll::Pending => {
475 let timeout =
476 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
477 if let Some(timeout) = timeout {
478 if !parker.park_timeout(timeout)
479 && deadline.is_some_and(|deadline| deadline < Instant::now())
480 {
481 return Err(future);
482 }
483 } else {
484 parker.park();
485 }
486 }
487 }
488 }
489 }
490
491 #[cfg(any(test, feature = "test-support"))]
492 #[track_caller]
493 pub(crate) fn block_internal<Fut: Future>(
494 &self,
495 background_only: bool,
496 future: Fut,
497 timeout: Option<Duration>,
498 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
499 use std::sync::atomic::AtomicBool;
500 use std::time::Instant;
501
502 use parking::Parker;
503
504 let mut future = Box::pin(future);
505 if timeout == Some(Duration::ZERO) {
506 return Err(future);
507 }
508
509 // When using a real platform (e.g., MacPlatform for visual tests that need actual
510 // Metal rendering), there's no test dispatcher. In this case, we block the thread
511 // directly by polling the future and parking until woken. This is required for
512 // VisualTestAppContext which uses real platform rendering but still needs blocking
513 // behavior for code paths like editor initialization that call block_with_timeout.
514 let Some(dispatcher) = self.dispatcher.as_test() else {
515 let deadline = timeout.map(|timeout| Instant::now() + timeout);
516
517 let parker = Parker::new();
518 let unparker = parker.unparker();
519 let waker = waker_fn(move || {
520 unparker.unpark();
521 });
522 let mut cx = std::task::Context::from_waker(&waker);
523
524 loop {
525 match future.as_mut().poll(&mut cx) {
526 Poll::Ready(result) => return Ok(result),
527 Poll::Pending => {
528 let timeout = deadline
529 .map(|deadline| deadline.saturating_duration_since(Instant::now()));
530 if let Some(timeout) = timeout {
531 if !parker.park_timeout(timeout)
532 && deadline.is_some_and(|deadline| deadline < Instant::now())
533 {
534 return Err(future);
535 }
536 } else {
537 parker.park();
538 }
539 }
540 }
541 }
542 };
543
544 let mut max_ticks = if timeout.is_some() {
545 dispatcher.gen_block_on_ticks()
546 } else {
547 usize::MAX
548 };
549
550 let parker = Parker::new();
551 let unparker = parker.unparker();
552
553 let awoken = Arc::new(AtomicBool::new(false));
554 let waker = waker_fn({
555 let awoken = awoken.clone();
556 let unparker = unparker.clone();
557 move || {
558 awoken.store(true, Ordering::SeqCst);
559 unparker.unpark();
560 }
561 });
562 let mut cx = std::task::Context::from_waker(&waker);
563
564 let duration = Duration::from_secs(
565 option_env!("GPUI_TEST_TIMEOUT")
566 .and_then(|s| s.parse::<u64>().ok())
567 .unwrap_or(180),
568 );
569 let mut test_should_end_by = Instant::now() + duration;
570
571 loop {
572 match future.as_mut().poll(&mut cx) {
573 Poll::Ready(result) => return Ok(result),
574 Poll::Pending => {
575 if max_ticks == 0 {
576 return Err(future);
577 }
578 max_ticks -= 1;
579
580 if !dispatcher.tick(background_only) {
581 if awoken.swap(false, Ordering::SeqCst) {
582 continue;
583 }
584
585 if !dispatcher.parking_allowed() {
586 if dispatcher.advance_clock_to_next_delayed() {
587 continue;
588 }
589 let mut backtrace_message = String::new();
590 let mut waiting_message = String::new();
591 if let Some(backtrace) = dispatcher.waiting_backtrace() {
592 backtrace_message =
593 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
594 }
595 if let Some(waiting_hint) = dispatcher.waiting_hint() {
596 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
597 }
598 panic!(
599 "parked with nothing left to run{waiting_message}{backtrace_message}",
600 )
601 }
602 dispatcher.push_unparker(unparker.clone());
603 parker.park_timeout(Duration::from_millis(1));
604 if Instant::now() > test_should_end_by {
605 panic!("test timed out after {duration:?} with allow_parking")
606 }
607 }
608 }
609 }
610 }
611 }
612
613 /// Block the current thread until the given future resolves
614 /// or `duration` has elapsed.
615 pub fn block_with_timeout<Fut: Future>(
616 &self,
617 duration: Duration,
618 future: Fut,
619 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
620 self.block_internal(true, future, Some(duration))
621 }
622
623 /// Scoped lets you start a number of tasks and waits
624 /// for all of them to complete before returning.
625 pub async fn scoped<'scope, F>(&self, scheduler: F)
626 where
627 F: FnOnce(&mut Scope<'scope>),
628 {
629 let mut scope = Scope::new(self.clone(), Priority::default());
630 (scheduler)(&mut scope);
631 let spawned = mem::take(&mut scope.futures)
632 .into_iter()
633 .map(|f| self.spawn_with_priority(scope.priority, f))
634 .collect::<Vec<_>>();
635 for task in spawned {
636 task.await;
637 }
638 }
639
640 /// Scoped lets you start a number of tasks and waits
641 /// for all of them to complete before returning.
642 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
643 where
644 F: FnOnce(&mut Scope<'scope>),
645 {
646 let mut scope = Scope::new(self.clone(), priority);
647 (scheduler)(&mut scope);
648 let spawned = mem::take(&mut scope.futures)
649 .into_iter()
650 .map(|f| self.spawn_with_priority(scope.priority, f))
651 .collect::<Vec<_>>();
652 for task in spawned {
653 task.await;
654 }
655 }
656
657 /// Get the current time.
658 ///
659 /// Calling this instead of `std::time::Instant::now` allows the use
660 /// of fake timers in tests.
661 pub fn now(&self) -> Instant {
662 self.dispatcher.now()
663 }
664
665 /// Returns a task that will complete after the given duration.
666 /// Depending on other concurrent tasks the elapsed duration may be longer
667 /// than requested.
668 pub fn timer(&self, duration: Duration) -> Task<()> {
669 if duration.is_zero() {
670 return Task::ready(());
671 }
672 let location = core::panic::Location::caller();
673 let (runnable, task) = async_task::Builder::new()
674 .metadata(RunnableMeta {
675 location,
676 app: None,
677 })
678 .spawn(move |_| async move {}, {
679 let dispatcher = self.dispatcher.clone();
680 move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
681 });
682 runnable.schedule();
683 Task(TaskState::Spawned(task))
684 }
685
686 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
687 #[cfg(any(test, feature = "test-support"))]
688 pub fn start_waiting(&self) {
689 self.dispatcher.as_test().unwrap().start_waiting();
690 }
691
692 /// in tests, removes the debugging data added by start_waiting
693 #[cfg(any(test, feature = "test-support"))]
694 pub fn finish_waiting(&self) {
695 self.dispatcher.as_test().unwrap().finish_waiting();
696 }
697
698 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
699 #[cfg(any(test, feature = "test-support"))]
700 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
701 self.dispatcher.as_test().unwrap().simulate_random_delay()
702 }
703
704 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
705 #[cfg(any(test, feature = "test-support"))]
706 pub fn deprioritize(&self, task_label: TaskLabel) {
707 self.dispatcher.as_test().unwrap().deprioritize(task_label)
708 }
709
710 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
711 #[cfg(any(test, feature = "test-support"))]
712 pub fn advance_clock(&self, duration: Duration) {
713 self.dispatcher.as_test().unwrap().advance_clock(duration)
714 }
715
716 /// in tests, run one task.
717 #[cfg(any(test, feature = "test-support"))]
718 pub fn tick(&self) -> bool {
719 self.dispatcher.as_test().unwrap().tick(false)
720 }
721
722 /// in tests, run all tasks that are ready to run. If after doing so
723 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
724 #[cfg(any(test, feature = "test-support"))]
725 pub fn run_until_parked(&self) {
726 self.dispatcher.as_test().unwrap().run_until_parked()
727 }
728
729 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
730 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
731 /// do take real async time to run.
732 #[cfg(any(test, feature = "test-support"))]
733 pub fn allow_parking(&self) {
734 self.dispatcher.as_test().unwrap().allow_parking();
735 }
736
737 /// undoes the effect of [`Self::allow_parking`].
738 #[cfg(any(test, feature = "test-support"))]
739 pub fn forbid_parking(&self) {
740 self.dispatcher.as_test().unwrap().forbid_parking();
741 }
742
743 /// adds detail to the "parked with nothing let to run" message.
744 #[cfg(any(test, feature = "test-support"))]
745 pub fn set_waiting_hint(&self, msg: Option<String>) {
746 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
747 }
748
749 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
750 #[cfg(any(test, feature = "test-support"))]
751 pub fn rng(&self) -> StdRng {
752 self.dispatcher.as_test().unwrap().rng()
753 }
754
755 /// How many CPUs are available to the dispatcher.
756 pub fn num_cpus(&self) -> usize {
757 #[cfg(any(test, feature = "test-support"))]
758 return 4;
759
760 #[cfg(not(any(test, feature = "test-support")))]
761 return num_cpus::get();
762 }
763
764 /// Whether we're on the main thread.
765 pub fn is_main_thread(&self) -> bool {
766 self.dispatcher.is_main_thread()
767 }
768
769 #[cfg(any(test, feature = "test-support"))]
770 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
771 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
772 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
773 }
774}
775
776/// ForegroundExecutor runs things on the main thread.
777impl ForegroundExecutor {
778 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
779 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
780 Self {
781 dispatcher,
782 not_send: PhantomData,
783 }
784 }
785
786 /// Enqueues the given Task to run on the main thread at some point in the future.
787 #[track_caller]
788 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
789 where
790 R: 'static,
791 {
792 self.inner_spawn(None, Priority::default(), future)
793 }
794
795 /// Enqueues the given Task to run on the main thread at some point in the future.
796 #[track_caller]
797 pub fn spawn_with_priority<R>(
798 &self,
799 priority: Priority,
800 future: impl Future<Output = R> + 'static,
801 ) -> Task<R>
802 where
803 R: 'static,
804 {
805 self.inner_spawn(None, priority, future)
806 }
807
808 #[track_caller]
809 pub(crate) fn spawn_context<R>(
810 &self,
811 app: std::sync::Weak<()>,
812 future: impl Future<Output = R> + 'static,
813 ) -> Task<R>
814 where
815 R: 'static,
816 {
817 self.inner_spawn(Some(app), Priority::default(), future)
818 }
819
820 #[track_caller]
821 pub(crate) fn inner_spawn<R>(
822 &self,
823 app: Option<std::sync::Weak<()>>,
824 priority: Priority,
825 future: impl Future<Output = R> + 'static,
826 ) -> Task<R>
827 where
828 R: 'static,
829 {
830 let dispatcher = self.dispatcher.clone();
831 let location = core::panic::Location::caller();
832
833 #[track_caller]
834 fn inner<R: 'static>(
835 dispatcher: Arc<dyn PlatformDispatcher>,
836 future: AnyLocalFuture<R>,
837 location: &'static core::panic::Location<'static>,
838 app: Option<std::sync::Weak<()>>,
839 priority: Priority,
840 ) -> Task<R> {
841 let (runnable, task) = spawn_local_with_source_location(
842 future,
843 move |runnable| {
844 dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
845 },
846 RunnableMeta { location, app },
847 );
848 runnable.schedule();
849 Task(TaskState::Spawned(task))
850 }
851 inner::<R>(dispatcher, Box::pin(future), location, app, priority)
852 }
853}
854
855/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
856///
857/// Copy-modified from:
858/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
859#[track_caller]
860fn spawn_local_with_source_location<Fut, S, M>(
861 future: Fut,
862 schedule: S,
863 metadata: M,
864) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
865where
866 Fut: Future + 'static,
867 Fut::Output: 'static,
868 S: async_task::Schedule<M> + Send + Sync + 'static,
869 M: 'static,
870{
871 #[inline]
872 fn thread_id() -> ThreadId {
873 std::thread_local! {
874 static ID: ThreadId = thread::current().id();
875 }
876 ID.try_with(|id| *id)
877 .unwrap_or_else(|_| thread::current().id())
878 }
879
880 struct Checked<F> {
881 id: ThreadId,
882 inner: ManuallyDrop<F>,
883 location: &'static Location<'static>,
884 }
885
886 impl<F> Drop for Checked<F> {
887 fn drop(&mut self) {
888 assert!(
889 self.id == thread_id(),
890 "local task dropped by a thread that didn't spawn it. Task spawned at {}",
891 self.location
892 );
893 unsafe { ManuallyDrop::drop(&mut self.inner) };
894 }
895 }
896
897 impl<F: Future> Future for Checked<F> {
898 type Output = F::Output;
899
900 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
901 assert!(
902 self.id == thread_id(),
903 "local task polled by a thread that didn't spawn it. Task spawned at {}",
904 self.location
905 );
906 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
907 }
908 }
909
910 // Wrap the future into one that checks which thread it's on.
911 let future = Checked {
912 id: thread_id(),
913 inner: ManuallyDrop::new(future),
914 location: Location::caller(),
915 };
916
917 unsafe {
918 async_task::Builder::new()
919 .metadata(metadata)
920 .spawn_unchecked(move |_| future, schedule)
921 }
922}
923
924/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
925pub struct Scope<'a> {
926 executor: BackgroundExecutor,
927 priority: Priority,
928 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
929 tx: Option<mpsc::Sender<()>>,
930 rx: mpsc::Receiver<()>,
931 lifetime: PhantomData<&'a ()>,
932}
933
934impl<'a> Scope<'a> {
935 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
936 let (tx, rx) = mpsc::channel(1);
937 Self {
938 executor,
939 priority,
940 tx: Some(tx),
941 rx,
942 futures: Default::default(),
943 lifetime: PhantomData,
944 }
945 }
946
947 /// How many CPUs are available to the dispatcher.
948 pub fn num_cpus(&self) -> usize {
949 self.executor.num_cpus()
950 }
951
952 /// Spawn a future into this scope.
953 #[track_caller]
954 pub fn spawn<F>(&mut self, f: F)
955 where
956 F: Future<Output = ()> + Send + 'a,
957 {
958 let tx = self.tx.clone().unwrap();
959
960 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
961 // dropping this `Scope` blocks until all of the futures have resolved.
962 let f = unsafe {
963 mem::transmute::<
964 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
965 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
966 >(Box::pin(async move {
967 f.await;
968 drop(tx);
969 }))
970 };
971 self.futures.push(f);
972 }
973}
974
975impl Drop for Scope<'_> {
976 fn drop(&mut self) {
977 self.tx.take().unwrap();
978
979 // Wait until the channel is closed, which means that all of the spawned
980 // futures have resolved.
981 self.executor.block(self.rx.next());
982 }
983}
984
985#[cfg(test)]
986mod test {
987 use super::*;
988 use crate::{App, TestDispatcher, TestPlatform};
989 use rand::SeedableRng;
990 use std::cell::RefCell;
991
992 #[test]
993 fn sanity_test_tasks_run() {
994 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
995 let arc_dispatcher = Arc::new(dispatcher.clone());
996 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
997 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
998
999 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1000 let asset_source = Arc::new(());
1001 let http_client = http_client::FakeHttpClient::with_404_response();
1002
1003 let app = App::new_app(platform, asset_source, http_client);
1004 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1005
1006 let task_ran = Rc::new(RefCell::new(false));
1007
1008 foreground_executor
1009 .spawn_context(liveness_token, {
1010 let task_ran = Rc::clone(&task_ran);
1011 async move {
1012 *task_ran.borrow_mut() = true;
1013 }
1014 })
1015 .detach();
1016
1017 // Run dispatcher while app is still alive
1018 dispatcher.run_until_parked();
1019
1020 // Task should have run
1021 assert!(
1022 *task_ran.borrow(),
1023 "Task should run normally when app is alive"
1024 );
1025 }
1026
1027 #[test]
1028 fn test_task_cancelled_when_app_dropped() {
1029 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1030 let arc_dispatcher = Arc::new(dispatcher.clone());
1031 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1032 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1033
1034 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1035 let asset_source = Arc::new(());
1036 let http_client = http_client::FakeHttpClient::with_404_response();
1037
1038 let app = App::new_app(platform, asset_source, http_client);
1039 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1040 let app_weak = Rc::downgrade(&app);
1041
1042 let task_ran = Rc::new(RefCell::new(false));
1043 let task_ran_clone = Rc::clone(&task_ran);
1044
1045 foreground_executor
1046 .spawn_context(liveness_token, async move {
1047 *task_ran_clone.borrow_mut() = true;
1048 })
1049 .detach();
1050
1051 drop(app);
1052
1053 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1054
1055 dispatcher.run_until_parked();
1056
1057 // The task should have been cancelled, not run
1058 assert!(
1059 !*task_ran.borrow(),
1060 "Task should have been cancelled when app was dropped, but it ran!"
1061 );
1062 }
1063
1064 #[test]
1065 fn test_nested_tasks_both_cancel() {
1066 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1067 let arc_dispatcher = Arc::new(dispatcher.clone());
1068 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1069 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1070
1071 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1072 let asset_source = Arc::new(());
1073 let http_client = http_client::FakeHttpClient::with_404_response();
1074
1075 let app = App::new_app(platform, asset_source, http_client);
1076 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1077 let app_weak = Rc::downgrade(&app);
1078
1079 let outer_completed = Rc::new(RefCell::new(false));
1080 let inner_completed = Rc::new(RefCell::new(false));
1081 let reached_await = Rc::new(RefCell::new(false));
1082
1083 let outer_flag = Rc::clone(&outer_completed);
1084 let inner_flag = Rc::clone(&inner_completed);
1085 let await_flag = Rc::clone(&reached_await);
1086
1087 // Channel to block the inner task until we're ready
1088 let (tx, rx) = futures::channel::oneshot::channel::<()>();
1089
1090 // We need clones of executor and liveness_token for the inner spawn
1091 let inner_executor = foreground_executor.clone();
1092 let inner_liveness_token = liveness_token.clone();
1093
1094 foreground_executor
1095 .spawn_context(liveness_token, async move {
1096 let inner_task = inner_executor.spawn_context(inner_liveness_token, {
1097 let inner_flag = Rc::clone(&inner_flag);
1098 async move {
1099 rx.await.ok();
1100 *inner_flag.borrow_mut() = true;
1101 }
1102 });
1103
1104 *await_flag.borrow_mut() = true;
1105
1106 inner_task.await;
1107
1108 *outer_flag.borrow_mut() = true;
1109 })
1110 .detach();
1111
1112 // Run dispatcher until outer task reaches the await point
1113 // The inner task will be blocked on the channel
1114 dispatcher.run_until_parked();
1115
1116 // Verify we actually reached the await point before dropping the app
1117 assert!(
1118 *reached_await.borrow(),
1119 "Outer task should have reached the await point"
1120 );
1121
1122 // Neither task should have completed yet
1123 assert!(
1124 !*outer_completed.borrow(),
1125 "Outer task should not have completed yet"
1126 );
1127 assert!(
1128 !*inner_completed.borrow(),
1129 "Inner task should not have completed yet"
1130 );
1131
1132 // Drop the channel sender and app while outer is awaiting inner
1133 drop(tx);
1134 drop(app);
1135 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1136
1137 // Run dispatcher - both tasks should be cancelled
1138 dispatcher.run_until_parked();
1139
1140 // Neither task should have completed (both were cancelled)
1141 assert!(
1142 !*outer_completed.borrow(),
1143 "Outer task should have been cancelled, not completed"
1144 );
1145 assert!(
1146 !*inner_completed.borrow(),
1147 "Inner task should have been cancelled, not completed"
1148 );
1149 }
1150
1151 #[test]
1152 fn test_task_without_app_tracking_still_runs() {
1153 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1154 let arc_dispatcher = Arc::new(dispatcher.clone());
1155 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1156 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1157
1158 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1159 let asset_source = Arc::new(());
1160 let http_client = http_client::FakeHttpClient::with_404_response();
1161
1162 let app = App::new_app(platform, asset_source, http_client);
1163 let app_weak = Rc::downgrade(&app);
1164
1165 let task_ran = Rc::new(RefCell::new(false));
1166 let task_ran_clone = Rc::clone(&task_ran);
1167
1168 let _task = foreground_executor.spawn(async move {
1169 *task_ran_clone.borrow_mut() = true;
1170 });
1171
1172 drop(app);
1173
1174 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1175
1176 dispatcher.run_until_parked();
1177
1178 assert!(
1179 *task_ran.borrow(),
1180 "Task without app tracking should still run after app is dropped"
1181 );
1182 }
1183
1184 #[test]
1185 #[should_panic]
1186 fn test_polling_cancelled_task_panics() {
1187 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1188 let arc_dispatcher = Arc::new(dispatcher.clone());
1189 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1190 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1191
1192 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1193 let asset_source = Arc::new(());
1194 let http_client = http_client::FakeHttpClient::with_404_response();
1195
1196 let app = App::new_app(platform, asset_source, http_client);
1197 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1198 let app_weak = Rc::downgrade(&app);
1199
1200 let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1201
1202 drop(app);
1203
1204 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1205
1206 dispatcher.run_until_parked();
1207
1208 background_executor.block(task);
1209 }
1210
1211 #[test]
1212 fn test_polling_cancelled_task_returns_none_with_fallible() {
1213 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1214 let arc_dispatcher = Arc::new(dispatcher.clone());
1215 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1216 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1217
1218 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1219 let asset_source = Arc::new(());
1220 let http_client = http_client::FakeHttpClient::with_404_response();
1221
1222 let app = App::new_app(platform, asset_source, http_client);
1223 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1224 let app_weak = Rc::downgrade(&app);
1225
1226 let task = foreground_executor
1227 .spawn_context(liveness_token, async move { 42 })
1228 .fallible();
1229
1230 drop(app);
1231
1232 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1233
1234 dispatcher.run_until_parked();
1235
1236 let result = background_executor.block(task);
1237 assert_eq!(result, None, "Cancelled task should return None");
1238 }
1239}