1use crate::{
2 App, AppLivenessToken, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler,
3};
4use async_task::Runnable;
5use futures::channel::mpsc;
6use parking_lot::{Condvar, Mutex};
7use smol::prelude::*;
8use std::{
9 fmt::Debug,
10 marker::PhantomData,
11 mem::{self, ManuallyDrop},
12 num::NonZeroUsize,
13 panic::Location,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 Arc,
18 atomic::{AtomicUsize, Ordering},
19 },
20 task::{Context, Poll},
21 thread::{self, ThreadId},
22 time::{Duration, Instant},
23};
24use util::TryFutureExt;
25use waker_fn::waker_fn;
26
27#[cfg(any(test, feature = "test-support"))]
28use rand::rngs::StdRng;
29
30/// A pointer to the executor that is currently running,
31/// for spawning background tasks.
32#[derive(Clone)]
33pub struct BackgroundExecutor {
34 #[doc(hidden)]
35 pub dispatcher: Arc<dyn PlatformDispatcher>,
36}
37
38/// A pointer to the executor that is currently running,
39/// for spawning tasks on the main thread.
40///
41/// This is intentionally `!Send` via the `not_send` marker field. This is because
42/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
43/// only polled from the same thread it was spawned from. These checks would fail when spawning
44/// foreground tasks from background threads.
45#[derive(Clone)]
46pub struct ForegroundExecutor {
47 #[doc(hidden)]
48 pub dispatcher: Arc<dyn PlatformDispatcher>,
49 not_send: PhantomData<Rc<()>>,
50}
51
52/// Realtime task priority
53#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
54#[repr(u8)]
55pub enum RealtimePriority {
56 /// Audio task
57 Audio,
58 /// Other realtime task
59 #[default]
60 Other,
61}
62
63/// Task priority
64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
65#[repr(u8)]
66pub enum Priority {
67 /// Realtime priority
68 ///
69 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
70 Realtime(RealtimePriority),
71 /// High priority
72 ///
73 /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
74 High,
75 /// Medium priority, probably suits most of your use cases.
76 #[default]
77 Medium,
78 /// Low priority
79 ///
80 /// Prioritize this for background work that can come in large quantities
81 /// to not starve the executor of resources for high priority tasks
82 Low,
83}
84
85impl Priority {
86 #[allow(dead_code)]
87 pub(crate) const fn probability(&self) -> u32 {
88 match self {
89 // realtime priorities are not considered for probability scheduling
90 Priority::Realtime(_) => 0,
91 Priority::High => 60,
92 Priority::Medium => 30,
93 Priority::Low => 10,
94 }
95 }
96}
97
98/// Task is a primitive that allows work to happen in the background.
99///
100/// It implements [`Future`] so you can `.await` on it.
101///
102/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
103/// the task to continue running, but with no way to return a value.
104#[must_use]
105#[derive(Debug)]
106pub struct Task<T>(TaskState<T>);
107
108#[derive(Debug)]
109enum TaskState<T> {
110 /// A task that is ready to return a value
111 Ready(Option<T>),
112
113 /// A task that is currently running.
114 Spawned(async_task::Task<T, RunnableMeta>),
115}
116
117impl<T> Task<T> {
118 /// Creates a new task that will resolve with the value
119 pub fn ready(val: T) -> Self {
120 Task(TaskState::Ready(Some(val)))
121 }
122
123 /// Detaching a task runs it to completion in the background
124 pub fn detach(self) {
125 match self {
126 Task(TaskState::Ready(_)) => {}
127 Task(TaskState::Spawned(task)) => task.detach(),
128 }
129 }
130
131 /// Converts this task into a fallible task that returns `Option<T>`.
132 ///
133 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
134 /// if the app was dropped while the task is executing.
135 ///
136 /// # Example
137 ///
138 /// ```ignore
139 /// // Background task that gracefully handles app shutdown:
140 /// cx.background_spawn(async move {
141 /// let result = foreground_task.fallible().await;
142 /// if let Some(value) = result {
143 /// // Process the value
144 /// }
145 /// // If None, app was shut down - just exit gracefully
146 /// }).detach();
147 /// ```
148 pub fn fallible(self) -> FallibleTask<T> {
149 FallibleTask(match self.0 {
150 TaskState::Ready(val) => FallibleTaskState::Ready(val),
151 TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
152 })
153 }
154}
155
156impl<E, T> Task<Result<T, E>>
157where
158 T: 'static,
159 E: 'static + Debug,
160{
161 /// Run the task to completion in the background and log any
162 /// errors that occur.
163 #[track_caller]
164 pub fn detach_and_log_err(self, cx: &App) {
165 let location = core::panic::Location::caller();
166 cx.foreground_executor()
167 .spawn(self.log_tracked_err(*location))
168 .detach();
169 }
170}
171
172impl<T> Future for Task<T> {
173 type Output = T;
174
175 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
176 match unsafe { self.get_unchecked_mut() } {
177 Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
178 Task(TaskState::Spawned(task)) => task.poll(cx),
179 }
180 }
181}
182
183/// A task that returns `Option<T>` instead of panicking when cancelled.
184#[must_use]
185pub struct FallibleTask<T>(FallibleTaskState<T>);
186
187enum FallibleTaskState<T> {
188 /// A task that is ready to return a value
189 Ready(Option<T>),
190
191 /// A task that is currently running (wraps async_task::FallibleTask).
192 Spawned(async_task::FallibleTask<T, RunnableMeta>),
193}
194
195impl<T> FallibleTask<T> {
196 /// Creates a new fallible task that will resolve with the value.
197 pub fn ready(val: T) -> Self {
198 FallibleTask(FallibleTaskState::Ready(Some(val)))
199 }
200
201 /// Detaching a task runs it to completion in the background.
202 pub fn detach(self) {
203 match self.0 {
204 FallibleTaskState::Ready(_) => {}
205 FallibleTaskState::Spawned(task) => task.detach(),
206 }
207 }
208}
209
210impl<T> Future for FallibleTask<T> {
211 type Output = Option<T>;
212
213 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
214 match unsafe { self.get_unchecked_mut() } {
215 FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
216 FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
217 }
218 }
219}
220
221impl<T> std::fmt::Debug for FallibleTask<T> {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 match &self.0 {
224 FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
225 FallibleTaskState::Spawned(task) => {
226 f.debug_tuple("FallibleTask::Spawned").field(task).finish()
227 }
228 }
229 }
230}
231
232/// A task label is an opaque identifier that you can use to
233/// refer to a task in tests.
234#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
235pub struct TaskLabel(NonZeroUsize);
236
237impl Default for TaskLabel {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243impl TaskLabel {
244 /// Construct a new task label.
245 pub fn new() -> Self {
246 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
247 Self(
248 NEXT_TASK_LABEL
249 .fetch_add(1, Ordering::SeqCst)
250 .try_into()
251 .unwrap(),
252 )
253 }
254}
255
256type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
257
258type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
259
260/// BackgroundExecutor lets you run things on background threads.
261/// In production this is a thread pool with no ordering guarantees.
262/// In tests this is simulated by running tasks one by one in a deterministic
263/// (but arbitrary) order controlled by the `SEED` environment variable.
264impl BackgroundExecutor {
265 #[doc(hidden)]
266 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
267 Self { dispatcher }
268 }
269
270 /// Enqueues the given future to be run to completion on a background thread.
271 #[track_caller]
272 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
273 where
274 R: Send + 'static,
275 {
276 self.spawn_with_priority(Priority::default(), future)
277 }
278
279 /// Enqueues the given future to be run to completion on a background thread.
280 #[track_caller]
281 pub fn spawn_with_priority<R>(
282 &self,
283 priority: Priority,
284 future: impl Future<Output = R> + Send + 'static,
285 ) -> Task<R>
286 where
287 R: Send + 'static,
288 {
289 self.spawn_internal::<R>(Box::pin(future), None, priority)
290 }
291
292 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
293 ///
294 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
295 /// completion before the current task is resumed, even if the current task is slated for cancellation.
296 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
297 where
298 R: Send,
299 {
300 // We need to ensure that cancellation of the parent task does not drop the environment
301 // before the our own task has completed or got cancelled.
302 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
303
304 impl Drop for NotifyOnDrop<'_> {
305 fn drop(&mut self) {
306 *self.0.1.lock() = true;
307 self.0.0.notify_all();
308 }
309 }
310
311 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
312
313 impl Drop for WaitOnDrop<'_> {
314 fn drop(&mut self) {
315 let mut done = self.0.1.lock();
316 if !*done {
317 self.0.0.wait(&mut done);
318 }
319 }
320 }
321
322 let dispatcher = self.dispatcher.clone();
323 let location = core::panic::Location::caller();
324
325 let pair = &(Condvar::new(), Mutex::new(false));
326 let _wait_guard = WaitOnDrop(pair);
327
328 let (runnable, task) = unsafe {
329 async_task::Builder::new()
330 .metadata(RunnableMeta {
331 location,
332 app: None,
333 })
334 .spawn_unchecked(
335 move |_| async {
336 let _notify_guard = NotifyOnDrop(pair);
337 future.await
338 },
339 move |runnable| {
340 dispatcher.dispatch(
341 RunnableVariant::Meta(runnable),
342 None,
343 Priority::default(),
344 )
345 },
346 )
347 };
348 runnable.schedule();
349 task.await
350 }
351
352 /// Enqueues the given future to be run to completion on a background thread.
353 /// The given label can be used to control the priority of the task in tests.
354 #[track_caller]
355 pub fn spawn_labeled<R>(
356 &self,
357 label: TaskLabel,
358 future: impl Future<Output = R> + Send + 'static,
359 ) -> Task<R>
360 where
361 R: Send + 'static,
362 {
363 self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
364 }
365
366 #[track_caller]
367 fn spawn_internal<R: Send + 'static>(
368 &self,
369 future: AnyFuture<R>,
370 label: Option<TaskLabel>,
371 #[cfg_attr(
372 target_os = "windows",
373 expect(
374 unused_variables,
375 reason = "Multi priority scheduler is broken on windows"
376 )
377 )]
378 priority: Priority,
379 ) -> Task<R> {
380 let dispatcher = self.dispatcher.clone();
381 #[cfg(target_os = "windows")]
382 let priority = Priority::Medium; // multi-prio scheduler is broken on windows
383
384 let (runnable, task) = if let Priority::Realtime(realtime) = priority {
385 let location = core::panic::Location::caller();
386 let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
387
388 dispatcher.spawn_realtime(
389 realtime,
390 Box::new(move || {
391 while let Ok(runnable) = rx.recv() {
392 let start = Instant::now();
393 let location = runnable.metadata().location;
394 let mut timing = TaskTiming {
395 location,
396 start,
397 end: None,
398 };
399 profiler::add_task_timing(timing);
400
401 runnable.run();
402
403 let end = Instant::now();
404 timing.end = Some(end);
405 profiler::add_task_timing(timing);
406 }
407 }),
408 );
409
410 async_task::Builder::new()
411 .metadata(RunnableMeta {
412 location,
413 app: None,
414 })
415 .spawn(
416 move |_| future,
417 move |runnable| {
418 let _ = tx.send(runnable);
419 },
420 )
421 } else {
422 let location = core::panic::Location::caller();
423 async_task::Builder::new()
424 .metadata(RunnableMeta {
425 location,
426 app: None,
427 })
428 .spawn(
429 move |_| future,
430 move |runnable| {
431 dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
432 },
433 )
434 };
435
436 runnable.schedule();
437 Task(TaskState::Spawned(task))
438 }
439
440 /// Used by the test harness to run an async test in a synchronous fashion.
441 #[cfg(any(test, feature = "test-support"))]
442 #[track_caller]
443 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
444 if let Ok(value) = self.block_internal(false, future, None) {
445 value
446 } else {
447 unreachable!()
448 }
449 }
450
451 /// Block the current thread until the given future resolves.
452 /// Consider using `block_with_timeout` instead.
453 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
454 if let Ok(value) = self.block_internal(true, future, None) {
455 value
456 } else {
457 unreachable!()
458 }
459 }
460
461 #[cfg(not(any(test, feature = "test-support")))]
462 pub(crate) fn block_internal<Fut: Future>(
463 &self,
464 _background_only: bool,
465 future: Fut,
466 timeout: Option<Duration>,
467 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
468 use std::time::Instant;
469
470 let mut future = Box::pin(future);
471 if timeout == Some(Duration::ZERO) {
472 return Err(future);
473 }
474 let deadline = timeout.map(|timeout| Instant::now() + timeout);
475
476 let parker = parking::Parker::new();
477 let unparker = parker.unparker();
478 let waker = waker_fn(move || {
479 unparker.unpark();
480 });
481 let mut cx = std::task::Context::from_waker(&waker);
482
483 loop {
484 match future.as_mut().poll(&mut cx) {
485 Poll::Ready(result) => return Ok(result),
486 Poll::Pending => {
487 let timeout =
488 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
489 if let Some(timeout) = timeout {
490 if !parker.park_timeout(timeout)
491 && deadline.is_some_and(|deadline| deadline < Instant::now())
492 {
493 return Err(future);
494 }
495 } else {
496 parker.park();
497 }
498 }
499 }
500 }
501 }
502
503 #[cfg(any(test, feature = "test-support"))]
504 #[track_caller]
505 pub(crate) fn block_internal<Fut: Future>(
506 &self,
507 background_only: bool,
508 future: Fut,
509 timeout: Option<Duration>,
510 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
511 use std::sync::atomic::AtomicBool;
512
513 use parking::Parker;
514
515 let mut future = Box::pin(future);
516 if timeout == Some(Duration::ZERO) {
517 return Err(future);
518 }
519 let Some(dispatcher) = self.dispatcher.as_test() else {
520 return Err(future);
521 };
522
523 let mut max_ticks = if timeout.is_some() {
524 dispatcher.gen_block_on_ticks()
525 } else {
526 usize::MAX
527 };
528
529 let parker = Parker::new();
530 let unparker = parker.unparker();
531
532 let awoken = Arc::new(AtomicBool::new(false));
533 let waker = waker_fn({
534 let awoken = awoken.clone();
535 let unparker = unparker.clone();
536 move || {
537 awoken.store(true, Ordering::SeqCst);
538 unparker.unpark();
539 }
540 });
541 let mut cx = std::task::Context::from_waker(&waker);
542
543 let duration = Duration::from_secs(
544 option_env!("GPUI_TEST_TIMEOUT")
545 .and_then(|s| s.parse::<u64>().ok())
546 .unwrap_or(180),
547 );
548 let mut test_should_end_by = Instant::now() + duration;
549
550 loop {
551 match future.as_mut().poll(&mut cx) {
552 Poll::Ready(result) => return Ok(result),
553 Poll::Pending => {
554 if max_ticks == 0 {
555 return Err(future);
556 }
557 max_ticks -= 1;
558
559 if !dispatcher.tick(background_only) {
560 if awoken.swap(false, Ordering::SeqCst) {
561 continue;
562 }
563
564 if !dispatcher.parking_allowed() {
565 if dispatcher.advance_clock_to_next_delayed() {
566 continue;
567 }
568 let mut backtrace_message = String::new();
569 let mut waiting_message = String::new();
570 if let Some(backtrace) = dispatcher.waiting_backtrace() {
571 backtrace_message =
572 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
573 }
574 if let Some(waiting_hint) = dispatcher.waiting_hint() {
575 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
576 }
577 panic!(
578 "parked with nothing left to run{waiting_message}{backtrace_message}",
579 )
580 }
581 dispatcher.push_unparker(unparker.clone());
582 parker.park_timeout(Duration::from_millis(1));
583 if Instant::now() > test_should_end_by {
584 panic!("test timed out after {duration:?} with allow_parking")
585 }
586 }
587 }
588 }
589 }
590 }
591
592 /// Block the current thread until the given future resolves
593 /// or `duration` has elapsed.
594 pub fn block_with_timeout<Fut: Future>(
595 &self,
596 duration: Duration,
597 future: Fut,
598 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
599 self.block_internal(true, future, Some(duration))
600 }
601
602 /// Scoped lets you start a number of tasks and waits
603 /// for all of them to complete before returning.
604 pub async fn scoped<'scope, F>(&self, scheduler: F)
605 where
606 F: FnOnce(&mut Scope<'scope>),
607 {
608 let mut scope = Scope::new(self.clone(), Priority::default());
609 (scheduler)(&mut scope);
610 let spawned = mem::take(&mut scope.futures)
611 .into_iter()
612 .map(|f| self.spawn_with_priority(scope.priority, f))
613 .collect::<Vec<_>>();
614 for task in spawned {
615 task.await;
616 }
617 }
618
619 /// Scoped lets you start a number of tasks and waits
620 /// for all of them to complete before returning.
621 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
622 where
623 F: FnOnce(&mut Scope<'scope>),
624 {
625 let mut scope = Scope::new(self.clone(), priority);
626 (scheduler)(&mut scope);
627 let spawned = mem::take(&mut scope.futures)
628 .into_iter()
629 .map(|f| self.spawn_with_priority(scope.priority, f))
630 .collect::<Vec<_>>();
631 for task in spawned {
632 task.await;
633 }
634 }
635
636 /// Get the current time.
637 ///
638 /// Calling this instead of `std::time::Instant::now` allows the use
639 /// of fake timers in tests.
640 pub fn now(&self) -> Instant {
641 self.dispatcher.now()
642 }
643
644 /// Returns a task that will complete after the given duration.
645 /// Depending on other concurrent tasks the elapsed duration may be longer
646 /// than requested.
647 pub fn timer(&self, duration: Duration) -> Task<()> {
648 if duration.is_zero() {
649 return Task::ready(());
650 }
651 let location = core::panic::Location::caller();
652 let (runnable, task) = async_task::Builder::new()
653 .metadata(RunnableMeta {
654 location,
655 app: None,
656 })
657 .spawn(move |_| async move {}, {
658 let dispatcher = self.dispatcher.clone();
659 move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
660 });
661 runnable.schedule();
662 Task(TaskState::Spawned(task))
663 }
664
665 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
666 #[cfg(any(test, feature = "test-support"))]
667 pub fn start_waiting(&self) {
668 self.dispatcher.as_test().unwrap().start_waiting();
669 }
670
671 /// in tests, removes the debugging data added by start_waiting
672 #[cfg(any(test, feature = "test-support"))]
673 pub fn finish_waiting(&self) {
674 self.dispatcher.as_test().unwrap().finish_waiting();
675 }
676
677 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
678 #[cfg(any(test, feature = "test-support"))]
679 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
680 self.dispatcher.as_test().unwrap().simulate_random_delay()
681 }
682
683 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
684 #[cfg(any(test, feature = "test-support"))]
685 pub fn deprioritize(&self, task_label: TaskLabel) {
686 self.dispatcher.as_test().unwrap().deprioritize(task_label)
687 }
688
689 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
690 #[cfg(any(test, feature = "test-support"))]
691 pub fn advance_clock(&self, duration: Duration) {
692 self.dispatcher.as_test().unwrap().advance_clock(duration)
693 }
694
695 /// in tests, run one task.
696 #[cfg(any(test, feature = "test-support"))]
697 pub fn tick(&self) -> bool {
698 self.dispatcher.as_test().unwrap().tick(false)
699 }
700
701 /// in tests, run all tasks that are ready to run. If after doing so
702 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
703 #[cfg(any(test, feature = "test-support"))]
704 pub fn run_until_parked(&self) {
705 self.dispatcher.as_test().unwrap().run_until_parked()
706 }
707
708 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
709 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
710 /// do take real async time to run.
711 #[cfg(any(test, feature = "test-support"))]
712 pub fn allow_parking(&self) {
713 self.dispatcher.as_test().unwrap().allow_parking();
714 }
715
716 /// undoes the effect of [`Self::allow_parking`].
717 #[cfg(any(test, feature = "test-support"))]
718 pub fn forbid_parking(&self) {
719 self.dispatcher.as_test().unwrap().forbid_parking();
720 }
721
722 /// adds detail to the "parked with nothing let to run" message.
723 #[cfg(any(test, feature = "test-support"))]
724 pub fn set_waiting_hint(&self, msg: Option<String>) {
725 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
726 }
727
728 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
729 #[cfg(any(test, feature = "test-support"))]
730 pub fn rng(&self) -> StdRng {
731 self.dispatcher.as_test().unwrap().rng()
732 }
733
734 /// How many CPUs are available to the dispatcher.
735 pub fn num_cpus(&self) -> usize {
736 #[cfg(any(test, feature = "test-support"))]
737 return 4;
738
739 #[cfg(not(any(test, feature = "test-support")))]
740 return num_cpus::get();
741 }
742
743 /// Whether we're on the main thread.
744 pub fn is_main_thread(&self) -> bool {
745 self.dispatcher.is_main_thread()
746 }
747
748 #[cfg(any(test, feature = "test-support"))]
749 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
750 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
751 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
752 }
753}
754
755/// ForegroundExecutor runs things on the main thread.
756impl ForegroundExecutor {
757 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
758 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
759 Self {
760 dispatcher,
761 not_send: PhantomData,
762 }
763 }
764
765 /// Enqueues the given Task to run on the main thread at some point in the future.
766 #[track_caller]
767 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
768 where
769 R: 'static,
770 {
771 self.inner_spawn(None, Priority::default(), future)
772 }
773
774 /// Enqueues the given Task to run on the main thread at some point in the future.
775 #[track_caller]
776 pub fn spawn_with_priority<R>(
777 &self,
778 priority: Priority,
779 future: impl Future<Output = R> + 'static,
780 ) -> Task<R>
781 where
782 R: 'static,
783 {
784 self.inner_spawn(None, priority, future)
785 }
786
787 #[track_caller]
788 pub(crate) fn spawn_context<R>(
789 &self,
790 app: AppLivenessToken,
791 future: impl Future<Output = R> + 'static,
792 ) -> Task<R>
793 where
794 R: 'static,
795 {
796 self.inner_spawn(Some(app), Priority::default(), future)
797 }
798
799 #[track_caller]
800 pub(crate) fn inner_spawn<R>(
801 &self,
802 app: Option<AppLivenessToken>,
803 priority: Priority,
804 future: impl Future<Output = R> + 'static,
805 ) -> Task<R>
806 where
807 R: 'static,
808 {
809 let dispatcher = self.dispatcher.clone();
810 let location = core::panic::Location::caller();
811
812 #[track_caller]
813 fn inner<R: 'static>(
814 dispatcher: Arc<dyn PlatformDispatcher>,
815 future: AnyLocalFuture<R>,
816 location: &'static core::panic::Location<'static>,
817 app: Option<AppLivenessToken>,
818 priority: Priority,
819 ) -> Task<R> {
820 let (runnable, task) = spawn_local_with_source_location(
821 future,
822 move |runnable| {
823 dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
824 },
825 RunnableMeta { location, app },
826 );
827 runnable.schedule();
828 Task(TaskState::Spawned(task))
829 }
830 inner::<R>(dispatcher, Box::pin(future), location, app, priority)
831 }
832}
833
834/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
835///
836/// Copy-modified from:
837/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
838#[track_caller]
839fn spawn_local_with_source_location<Fut, S, M>(
840 future: Fut,
841 schedule: S,
842 metadata: M,
843) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
844where
845 Fut: Future + 'static,
846 Fut::Output: 'static,
847 S: async_task::Schedule<M> + Send + Sync + 'static,
848 M: 'static,
849{
850 #[inline]
851 fn thread_id() -> ThreadId {
852 std::thread_local! {
853 static ID: ThreadId = thread::current().id();
854 }
855 ID.try_with(|id| *id)
856 .unwrap_or_else(|_| thread::current().id())
857 }
858
859 struct Checked<F> {
860 id: ThreadId,
861 inner: ManuallyDrop<F>,
862 location: &'static Location<'static>,
863 }
864
865 impl<F> Drop for Checked<F> {
866 fn drop(&mut self) {
867 assert!(
868 self.id == thread_id(),
869 "local task dropped by a thread that didn't spawn it. Task spawned at {}",
870 self.location
871 );
872 unsafe { ManuallyDrop::drop(&mut self.inner) };
873 }
874 }
875
876 impl<F: Future> Future for Checked<F> {
877 type Output = F::Output;
878
879 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
880 assert!(
881 self.id == thread_id(),
882 "local task polled by a thread that didn't spawn it. Task spawned at {}",
883 self.location
884 );
885 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
886 }
887 }
888
889 // Wrap the future into one that checks which thread it's on.
890 let future = Checked {
891 id: thread_id(),
892 inner: ManuallyDrop::new(future),
893 location: Location::caller(),
894 };
895
896 unsafe {
897 async_task::Builder::new()
898 .metadata(metadata)
899 .spawn_unchecked(move |_| future, schedule)
900 }
901}
902
903/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
904pub struct Scope<'a> {
905 executor: BackgroundExecutor,
906 priority: Priority,
907 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
908 tx: Option<mpsc::Sender<()>>,
909 rx: mpsc::Receiver<()>,
910 lifetime: PhantomData<&'a ()>,
911}
912
913impl<'a> Scope<'a> {
914 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
915 let (tx, rx) = mpsc::channel(1);
916 Self {
917 executor,
918 priority,
919 tx: Some(tx),
920 rx,
921 futures: Default::default(),
922 lifetime: PhantomData,
923 }
924 }
925
926 /// How many CPUs are available to the dispatcher.
927 pub fn num_cpus(&self) -> usize {
928 self.executor.num_cpus()
929 }
930
931 /// Spawn a future into this scope.
932 #[track_caller]
933 pub fn spawn<F>(&mut self, f: F)
934 where
935 F: Future<Output = ()> + Send + 'a,
936 {
937 let tx = self.tx.clone().unwrap();
938
939 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
940 // dropping this `Scope` blocks until all of the futures have resolved.
941 let f = unsafe {
942 mem::transmute::<
943 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
944 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
945 >(Box::pin(async move {
946 f.await;
947 drop(tx);
948 }))
949 };
950 self.futures.push(f);
951 }
952}
953
954impl Drop for Scope<'_> {
955 fn drop(&mut self) {
956 self.tx.take().unwrap();
957
958 // Wait until the channel is closed, which means that all of the spawned
959 // futures have resolved.
960 self.executor.block(self.rx.next());
961 }
962}
963
964#[cfg(test)]
965mod test {
966 use super::*;
967 use crate::{App, TestDispatcher, TestPlatform};
968 use rand::SeedableRng;
969 use std::cell::RefCell;
970
971 #[test]
972 fn sanity_test_tasks_run() {
973 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
974 let arc_dispatcher = Arc::new(dispatcher.clone());
975 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
976 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
977
978 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
979 let asset_source = Arc::new(());
980 let http_client = http_client::FakeHttpClient::with_404_response();
981
982 let app = App::new_app(platform, asset_source, http_client);
983 let liveness_token = app.borrow().liveness.token();
984
985 let task_ran = Rc::new(RefCell::new(false));
986
987 foreground_executor
988 .spawn_context(liveness_token, {
989 let task_ran = Rc::clone(&task_ran);
990 async move {
991 *task_ran.borrow_mut() = true;
992 }
993 })
994 .detach();
995
996 // Run dispatcher while app is still alive
997 dispatcher.run_until_parked();
998
999 // Task should have run
1000 assert!(
1001 *task_ran.borrow(),
1002 "Task should run normally when app is alive"
1003 );
1004 }
1005
1006 #[test]
1007 fn test_task_cancelled_when_app_dropped() {
1008 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1009 let arc_dispatcher = Arc::new(dispatcher.clone());
1010 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1011 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1012
1013 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1014 let asset_source = Arc::new(());
1015 let http_client = http_client::FakeHttpClient::with_404_response();
1016
1017 let app = App::new_app(platform, asset_source, http_client);
1018 let liveness_token = app.borrow().liveness.token();
1019 let app_weak = Rc::downgrade(&app);
1020
1021 let task_ran = Rc::new(RefCell::new(false));
1022 let task_ran_clone = Rc::clone(&task_ran);
1023
1024 foreground_executor
1025 .spawn_context(liveness_token, async move {
1026 *task_ran_clone.borrow_mut() = true;
1027 })
1028 .detach();
1029
1030 drop(app);
1031
1032 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1033
1034 dispatcher.run_until_parked();
1035
1036 // The task should have been cancelled, not run
1037 assert!(
1038 !*task_ran.borrow(),
1039 "Task should have been cancelled when app was dropped, but it ran!"
1040 );
1041 }
1042
1043 #[test]
1044 fn test_nested_tasks_both_cancel() {
1045 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1046 let arc_dispatcher = Arc::new(dispatcher.clone());
1047 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1048 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1049
1050 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1051 let asset_source = Arc::new(());
1052 let http_client = http_client::FakeHttpClient::with_404_response();
1053
1054 let app = App::new_app(platform, asset_source, http_client);
1055 let liveness_token = app.borrow().liveness.token();
1056 let app_weak = Rc::downgrade(&app);
1057
1058 let outer_completed = Rc::new(RefCell::new(false));
1059 let inner_completed = Rc::new(RefCell::new(false));
1060 let reached_await = Rc::new(RefCell::new(false));
1061
1062 let outer_flag = Rc::clone(&outer_completed);
1063 let inner_flag = Rc::clone(&inner_completed);
1064 let await_flag = Rc::clone(&reached_await);
1065
1066 // Channel to block the inner task until we're ready
1067 let (tx, rx) = futures::channel::oneshot::channel::<()>();
1068
1069 // We need clones of executor and liveness_token for the inner spawn
1070 let inner_executor = foreground_executor.clone();
1071 let inner_liveness_token = liveness_token.clone();
1072
1073 // Spawn outer task that will spawn and await an inner task
1074 foreground_executor
1075 .spawn_context(liveness_token, async move {
1076 let inner_flag_clone = Rc::clone(&inner_flag);
1077
1078 // Spawn inner task that blocks on a channel
1079 let inner_task = inner_executor.spawn_context(inner_liveness_token, async move {
1080 // Wait for signal (which will never come - we'll drop the app instead)
1081 rx.await.ok();
1082 *inner_flag_clone.borrow_mut() = true;
1083 });
1084
1085 // Mark that we've reached the await point
1086 *await_flag.borrow_mut() = true;
1087
1088 // Await inner task - this should not panic when both are cancelled
1089 inner_task.await;
1090
1091 // Mark outer as complete (should never reach here)
1092 *outer_flag.borrow_mut() = true;
1093 })
1094 .detach();
1095
1096 // Run dispatcher until outer task reaches the await point
1097 // The inner task will be blocked on the channel
1098 dispatcher.run_until_parked();
1099
1100 // Verify we actually reached the await point before dropping the app
1101 assert!(
1102 *reached_await.borrow(),
1103 "Outer task should have reached the await point"
1104 );
1105
1106 // Neither task should have completed yet
1107 assert!(
1108 !*outer_completed.borrow(),
1109 "Outer task should not have completed yet"
1110 );
1111 assert!(
1112 !*inner_completed.borrow(),
1113 "Inner task should not have completed yet"
1114 );
1115
1116 // Drop the channel sender and app while outer is awaiting inner
1117 drop(tx);
1118 drop(app);
1119 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1120
1121 // Run dispatcher - both tasks should be cancelled
1122 dispatcher.run_until_parked();
1123
1124 // Neither task should have completed (both were cancelled)
1125 assert!(
1126 !*outer_completed.borrow(),
1127 "Outer task should have been cancelled, not completed"
1128 );
1129 assert!(
1130 !*inner_completed.borrow(),
1131 "Inner task should have been cancelled, not completed"
1132 );
1133 }
1134
1135 #[test]
1136 fn test_task_without_app_tracking_still_runs() {
1137 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1138 let arc_dispatcher = Arc::new(dispatcher.clone());
1139 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1140 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1141
1142 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1143 let asset_source = Arc::new(());
1144 let http_client = http_client::FakeHttpClient::with_404_response();
1145
1146 let app = App::new_app(platform, asset_source, http_client);
1147 let app_weak = Rc::downgrade(&app);
1148
1149 let task_ran = Rc::new(RefCell::new(false));
1150 let task_ran_clone = Rc::clone(&task_ran);
1151
1152 let _task = foreground_executor.spawn(async move {
1153 *task_ran_clone.borrow_mut() = true;
1154 });
1155
1156 drop(app);
1157
1158 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1159
1160 dispatcher.run_until_parked();
1161
1162 assert!(
1163 *task_ran.borrow(),
1164 "Task without app tracking should still run after app is dropped"
1165 );
1166 }
1167
1168 #[test]
1169 #[should_panic]
1170 fn test_polling_cancelled_task_panics() {
1171 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1172 let arc_dispatcher = Arc::new(dispatcher.clone());
1173 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1174 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1175
1176 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1177 let asset_source = Arc::new(());
1178 let http_client = http_client::FakeHttpClient::with_404_response();
1179
1180 let app = App::new_app(platform, asset_source, http_client);
1181 let liveness_token = app.borrow().liveness.token();
1182 let app_weak = Rc::downgrade(&app);
1183
1184 let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1185
1186 drop(app);
1187
1188 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1189
1190 dispatcher.run_until_parked();
1191
1192 background_executor.block(task);
1193 }
1194
1195 #[test]
1196 fn test_polling_cancelled_task_returns_none_with_fallible() {
1197 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1198 let arc_dispatcher = Arc::new(dispatcher.clone());
1199 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1200 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1201
1202 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1203 let asset_source = Arc::new(());
1204 let http_client = http_client::FakeHttpClient::with_404_response();
1205
1206 let app = App::new_app(platform, asset_source, http_client);
1207 let liveness_token = app.borrow().liveness.token();
1208 let app_weak = Rc::downgrade(&app);
1209
1210 let task = foreground_executor
1211 .spawn_context(liveness_token, async move { 42 })
1212 .fallible();
1213
1214 drop(app);
1215
1216 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1217
1218 dispatcher.run_until_parked();
1219
1220 let result = background_executor.block(task);
1221 assert_eq!(result, None, "Cancelled task should return None");
1222 }
1223
1224 #[test]
1225 fn test_app_liveness_token_can_be_dropped_on_background_thread() {
1226 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1227 let arc_dispatcher = Arc::new(dispatcher.clone());
1228 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1229 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1230
1231 let platform = TestPlatform::new(background_executor, foreground_executor);
1232 let asset_source = Arc::new(());
1233 let http_client = http_client::FakeHttpClient::with_404_response();
1234
1235 let app = App::new_app(platform, asset_source, http_client);
1236 let liveness_token = app.borrow().liveness.token();
1237
1238 // Drop the token on a real background thread.
1239 std::thread::spawn(move || {
1240 drop(liveness_token);
1241 })
1242 .join()
1243 .expect("Dropping AppLivenessToken on background thread should not panic");
1244 }
1245}