1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use parking_lot::{Condvar, Mutex};
5use smol::prelude::*;
6use std::{
7 fmt::Debug,
8 marker::PhantomData,
9 mem::{self, ManuallyDrop},
10 num::NonZeroUsize,
11 panic::Location,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 Arc,
16 atomic::{AtomicUsize, Ordering},
17 },
18 task::{Context, Poll},
19 thread::{self, ThreadId},
20 time::{Duration, Instant},
21};
22use util::TryFutureExt;
23use waker_fn::waker_fn;
24
25#[cfg(any(test, feature = "test-support"))]
26use rand::rngs::StdRng;
27
28/// A pointer to the executor that is currently running,
29/// for spawning background tasks.
30#[derive(Clone)]
31pub struct BackgroundExecutor {
32 #[doc(hidden)]
33 pub dispatcher: Arc<dyn PlatformDispatcher>,
34}
35
36/// A pointer to the executor that is currently running,
37/// for spawning tasks on the main thread.
38///
39/// This is intentionally `!Send` via the `not_send` marker field. This is because
40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
41/// only polled from the same thread it was spawned from. These checks would fail when spawning
42/// foreground tasks from background threads.
43#[derive(Clone)]
44pub struct ForegroundExecutor {
45 #[doc(hidden)]
46 pub dispatcher: Arc<dyn PlatformDispatcher>,
47 not_send: PhantomData<Rc<()>>,
48}
49
50/// Realtime task priority
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52#[repr(u8)]
53pub enum RealtimePriority {
54 /// Audio task
55 Audio,
56 /// Other realtime task
57 #[default]
58 Other,
59}
60
61/// Task priority
62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
63#[repr(u8)]
64pub enum Priority {
65 /// Realtime priority
66 ///
67 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
68 Realtime(RealtimePriority),
69 /// High priority
70 ///
71 /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
72 High,
73 /// Medium priority, probably suits most of your use cases.
74 #[default]
75 Medium,
76 /// Low priority
77 ///
78 /// Prioritize this for background work that can come in large quantities
79 /// to not starve the executor of resources for high priority tasks
80 Low,
81}
82
83impl Priority {
84 #[allow(dead_code)]
85 pub(crate) const fn probability(&self) -> u32 {
86 match self {
87 // realtime priorities are not considered for probability scheduling
88 Priority::Realtime(_) => 0,
89 Priority::High => 60,
90 Priority::Medium => 30,
91 Priority::Low => 10,
92 }
93 }
94}
95
96/// Task is a primitive that allows work to happen in the background.
97///
98/// It implements [`Future`] so you can `.await` on it.
99///
100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
101/// the task to continue running, but with no way to return a value.
102#[must_use]
103#[derive(Debug)]
104pub struct Task<T>(TaskState<T>);
105
106#[derive(Debug)]
107enum TaskState<T> {
108 /// A task that is ready to return a value
109 Ready(Option<T>),
110
111 /// A task that is currently running.
112 Spawned(async_task::Task<T, RunnableMeta>),
113}
114
115impl<T> Task<T> {
116 /// Creates a new task that will resolve with the value
117 pub fn ready(val: T) -> Self {
118 Task(TaskState::Ready(Some(val)))
119 }
120
121 /// Detaching a task runs it to completion in the background
122 pub fn detach(self) {
123 match self {
124 Task(TaskState::Ready(_)) => {}
125 Task(TaskState::Spawned(task)) => task.detach(),
126 }
127 }
128
129 /// Converts this task into a fallible task that returns `Option<T>`.
130 ///
131 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
132 /// if the app was dropped while the task is executing.
133 ///
134 /// # Example
135 ///
136 /// ```ignore
137 /// // Background task that gracefully handles app shutdown:
138 /// cx.background_spawn(async move {
139 /// let result = foreground_task.fallible().await;
140 /// if let Some(value) = result {
141 /// // Process the value
142 /// }
143 /// // If None, app was shut down - just exit gracefully
144 /// }).detach();
145 /// ```
146 pub fn fallible(self) -> FallibleTask<T> {
147 FallibleTask(match self.0 {
148 TaskState::Ready(val) => FallibleTaskState::Ready(val),
149 TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
150 })
151 }
152}
153
154impl<E, T> Task<Result<T, E>>
155where
156 T: 'static,
157 E: 'static + Debug,
158{
159 /// Run the task to completion in the background and log any
160 /// errors that occur.
161 #[track_caller]
162 pub fn detach_and_log_err(self, cx: &App) {
163 let location = core::panic::Location::caller();
164 cx.foreground_executor()
165 .spawn(self.log_tracked_err(*location))
166 .detach();
167 }
168}
169
170impl<T> Future for Task<T> {
171 type Output = T;
172
173 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
174 match unsafe { self.get_unchecked_mut() } {
175 Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
176 Task(TaskState::Spawned(task)) => task.poll(cx),
177 }
178 }
179}
180
181/// A task that returns `Option<T>` instead of panicking when cancelled.
182#[must_use]
183pub struct FallibleTask<T>(FallibleTaskState<T>);
184
185enum FallibleTaskState<T> {
186 /// A task that is ready to return a value
187 Ready(Option<T>),
188
189 /// A task that is currently running (wraps async_task::FallibleTask).
190 Spawned(async_task::FallibleTask<T, RunnableMeta>),
191}
192
193impl<T> FallibleTask<T> {
194 /// Creates a new fallible task that will resolve with the value.
195 pub fn ready(val: T) -> Self {
196 FallibleTask(FallibleTaskState::Ready(Some(val)))
197 }
198
199 /// Detaching a task runs it to completion in the background.
200 pub fn detach(self) {
201 match self.0 {
202 FallibleTaskState::Ready(_) => {}
203 FallibleTaskState::Spawned(task) => task.detach(),
204 }
205 }
206}
207
208impl<T> Future for FallibleTask<T> {
209 type Output = Option<T>;
210
211 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
212 match unsafe { self.get_unchecked_mut() } {
213 FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
214 FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
215 }
216 }
217}
218
219impl<T> std::fmt::Debug for FallibleTask<T> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match &self.0 {
222 FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
223 FallibleTaskState::Spawned(task) => {
224 f.debug_tuple("FallibleTask::Spawned").field(task).finish()
225 }
226 }
227 }
228}
229
230/// A task label is an opaque identifier that you can use to
231/// refer to a task in tests.
232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
233pub struct TaskLabel(NonZeroUsize);
234
235impl Default for TaskLabel {
236 fn default() -> Self {
237 Self::new()
238 }
239}
240
241impl TaskLabel {
242 /// Construct a new task label.
243 pub fn new() -> Self {
244 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
245 Self(
246 NEXT_TASK_LABEL
247 .fetch_add(1, Ordering::SeqCst)
248 .try_into()
249 .unwrap(),
250 )
251 }
252}
253
254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
255
256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
257
258/// BackgroundExecutor lets you run things on background threads.
259/// In production this is a thread pool with no ordering guarantees.
260/// In tests this is simulated by running tasks one by one in a deterministic
261/// (but arbitrary) order controlled by the `SEED` environment variable.
262impl BackgroundExecutor {
263 #[doc(hidden)]
264 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
265 Self { dispatcher }
266 }
267
268 /// Enqueues the given future to be run to completion on a background thread.
269 #[track_caller]
270 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
271 where
272 R: Send + 'static,
273 {
274 self.spawn_with_priority(Priority::default(), future)
275 }
276
277 /// Enqueues the given future to be run to completion on a background thread.
278 #[track_caller]
279 pub fn spawn_with_priority<R>(
280 &self,
281 priority: Priority,
282 future: impl Future<Output = R> + Send + 'static,
283 ) -> Task<R>
284 where
285 R: Send + 'static,
286 {
287 self.spawn_internal::<R>(Box::pin(future), None, priority)
288 }
289
290 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
291 ///
292 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
293 /// completion before the current task is resumed, even if the current task is slated for cancellation.
294 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
295 where
296 R: Send,
297 {
298 // We need to ensure that cancellation of the parent task does not drop the environment
299 // before the our own task has completed or got cancelled.
300 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
301
302 impl Drop for NotifyOnDrop<'_> {
303 fn drop(&mut self) {
304 *self.0.1.lock() = true;
305 self.0.0.notify_all();
306 }
307 }
308
309 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
310
311 impl Drop for WaitOnDrop<'_> {
312 fn drop(&mut self) {
313 let mut done = self.0.1.lock();
314 if !*done {
315 self.0.0.wait(&mut done);
316 }
317 }
318 }
319
320 let dispatcher = self.dispatcher.clone();
321 let location = core::panic::Location::caller();
322
323 let pair = &(Condvar::new(), Mutex::new(false));
324 let _wait_guard = WaitOnDrop(pair);
325
326 let (runnable, task) = unsafe {
327 async_task::Builder::new()
328 .metadata(RunnableMeta {
329 location,
330 app: None,
331 })
332 .spawn_unchecked(
333 move |_| async {
334 let _notify_guard = NotifyOnDrop(pair);
335 future.await
336 },
337 move |runnable| {
338 dispatcher.dispatch(
339 RunnableVariant::Meta(runnable),
340 None,
341 Priority::default(),
342 )
343 },
344 )
345 };
346 runnable.schedule();
347 task.await
348 }
349
350 /// Enqueues the given future to be run to completion on a background thread.
351 /// The given label can be used to control the priority of the task in tests.
352 #[track_caller]
353 pub fn spawn_labeled<R>(
354 &self,
355 label: TaskLabel,
356 future: impl Future<Output = R> + Send + 'static,
357 ) -> Task<R>
358 where
359 R: Send + 'static,
360 {
361 self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
362 }
363
364 #[track_caller]
365 fn spawn_internal<R: Send + 'static>(
366 &self,
367 future: AnyFuture<R>,
368 label: Option<TaskLabel>,
369 priority: Priority,
370 ) -> Task<R> {
371 let dispatcher = self.dispatcher.clone();
372 let (runnable, task) = if let Priority::Realtime(realtime) = priority {
373 let location = core::panic::Location::caller();
374 let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
375
376 dispatcher.spawn_realtime(
377 realtime,
378 Box::new(move || {
379 while let Ok(runnable) = rx.recv() {
380 let start = Instant::now();
381 let location = runnable.metadata().location;
382 let mut timing = TaskTiming {
383 location,
384 start,
385 end: None,
386 };
387 profiler::add_task_timing(timing);
388
389 runnable.run();
390
391 let end = Instant::now();
392 timing.end = Some(end);
393 profiler::add_task_timing(timing);
394 }
395 }),
396 );
397
398 async_task::Builder::new()
399 .metadata(RunnableMeta {
400 location,
401 app: None,
402 })
403 .spawn(
404 move |_| future,
405 move |runnable| {
406 let _ = tx.send(runnable);
407 },
408 )
409 } else {
410 let location = core::panic::Location::caller();
411 async_task::Builder::new()
412 .metadata(RunnableMeta {
413 location,
414 app: None,
415 })
416 .spawn(
417 move |_| future,
418 move |runnable| {
419 dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
420 },
421 )
422 };
423
424 runnable.schedule();
425 Task(TaskState::Spawned(task))
426 }
427
428 /// Used by the test harness to run an async test in a synchronous fashion.
429 #[cfg(any(test, feature = "test-support"))]
430 #[track_caller]
431 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
432 if let Ok(value) = self.block_internal(false, future, None) {
433 value
434 } else {
435 unreachable!()
436 }
437 }
438
439 /// Block the current thread until the given future resolves.
440 /// Consider using `block_with_timeout` instead.
441 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
442 if let Ok(value) = self.block_internal(true, future, None) {
443 value
444 } else {
445 unreachable!()
446 }
447 }
448
449 #[cfg(not(any(test, feature = "test-support")))]
450 pub(crate) fn block_internal<Fut: Future>(
451 &self,
452 _background_only: bool,
453 future: Fut,
454 timeout: Option<Duration>,
455 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
456 use std::time::Instant;
457
458 let mut future = Box::pin(future);
459 if timeout == Some(Duration::ZERO) {
460 return Err(future);
461 }
462 let deadline = timeout.map(|timeout| Instant::now() + timeout);
463
464 let parker = parking::Parker::new();
465 let unparker = parker.unparker();
466 let waker = waker_fn(move || {
467 unparker.unpark();
468 });
469 let mut cx = std::task::Context::from_waker(&waker);
470
471 loop {
472 match future.as_mut().poll(&mut cx) {
473 Poll::Ready(result) => return Ok(result),
474 Poll::Pending => {
475 let timeout =
476 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
477 if let Some(timeout) = timeout {
478 if !parker.park_timeout(timeout)
479 && deadline.is_some_and(|deadline| deadline < Instant::now())
480 {
481 return Err(future);
482 }
483 } else {
484 parker.park();
485 }
486 }
487 }
488 }
489 }
490
491 #[cfg(any(test, feature = "test-support"))]
492 #[track_caller]
493 pub(crate) fn block_internal<Fut: Future>(
494 &self,
495 background_only: bool,
496 future: Fut,
497 timeout: Option<Duration>,
498 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
499 use std::sync::atomic::AtomicBool;
500
501 use parking::Parker;
502
503 let mut future = Box::pin(future);
504 if timeout == Some(Duration::ZERO) {
505 return Err(future);
506 }
507 let Some(dispatcher) = self.dispatcher.as_test() else {
508 return Err(future);
509 };
510
511 let mut max_ticks = if timeout.is_some() {
512 dispatcher.gen_block_on_ticks()
513 } else {
514 usize::MAX
515 };
516
517 let parker = Parker::new();
518 let unparker = parker.unparker();
519
520 let awoken = Arc::new(AtomicBool::new(false));
521 let waker = waker_fn({
522 let awoken = awoken.clone();
523 let unparker = unparker.clone();
524 move || {
525 awoken.store(true, Ordering::SeqCst);
526 unparker.unpark();
527 }
528 });
529 let mut cx = std::task::Context::from_waker(&waker);
530
531 let duration = Duration::from_secs(
532 option_env!("GPUI_TEST_TIMEOUT")
533 .and_then(|s| s.parse::<u64>().ok())
534 .unwrap_or(180),
535 );
536 let mut test_should_end_by = Instant::now() + duration;
537
538 loop {
539 match future.as_mut().poll(&mut cx) {
540 Poll::Ready(result) => return Ok(result),
541 Poll::Pending => {
542 if max_ticks == 0 {
543 return Err(future);
544 }
545 max_ticks -= 1;
546
547 if !dispatcher.tick(background_only) {
548 if awoken.swap(false, Ordering::SeqCst) {
549 continue;
550 }
551
552 if !dispatcher.parking_allowed() {
553 if dispatcher.advance_clock_to_next_delayed() {
554 continue;
555 }
556 let mut backtrace_message = String::new();
557 let mut waiting_message = String::new();
558 if let Some(backtrace) = dispatcher.waiting_backtrace() {
559 backtrace_message =
560 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
561 }
562 if let Some(waiting_hint) = dispatcher.waiting_hint() {
563 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
564 }
565 panic!(
566 "parked with nothing left to run{waiting_message}{backtrace_message}",
567 )
568 }
569 dispatcher.push_unparker(unparker.clone());
570 parker.park_timeout(Duration::from_millis(1));
571 if Instant::now() > test_should_end_by {
572 panic!("test timed out after {duration:?} with allow_parking")
573 }
574 }
575 }
576 }
577 }
578 }
579
580 /// Block the current thread until the given future resolves
581 /// or `duration` has elapsed.
582 pub fn block_with_timeout<Fut: Future>(
583 &self,
584 duration: Duration,
585 future: Fut,
586 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
587 self.block_internal(true, future, Some(duration))
588 }
589
590 /// Scoped lets you start a number of tasks and waits
591 /// for all of them to complete before returning.
592 pub async fn scoped<'scope, F>(&self, scheduler: F)
593 where
594 F: FnOnce(&mut Scope<'scope>),
595 {
596 let mut scope = Scope::new(self.clone(), Priority::default());
597 (scheduler)(&mut scope);
598 let spawned = mem::take(&mut scope.futures)
599 .into_iter()
600 .map(|f| self.spawn_with_priority(scope.priority, f))
601 .collect::<Vec<_>>();
602 for task in spawned {
603 task.await;
604 }
605 }
606
607 /// Scoped lets you start a number of tasks and waits
608 /// for all of them to complete before returning.
609 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
610 where
611 F: FnOnce(&mut Scope<'scope>),
612 {
613 let mut scope = Scope::new(self.clone(), priority);
614 (scheduler)(&mut scope);
615 let spawned = mem::take(&mut scope.futures)
616 .into_iter()
617 .map(|f| self.spawn_with_priority(scope.priority, f))
618 .collect::<Vec<_>>();
619 for task in spawned {
620 task.await;
621 }
622 }
623
624 /// Get the current time.
625 ///
626 /// Calling this instead of `std::time::Instant::now` allows the use
627 /// of fake timers in tests.
628 pub fn now(&self) -> Instant {
629 self.dispatcher.now()
630 }
631
632 /// Returns a task that will complete after the given duration.
633 /// Depending on other concurrent tasks the elapsed duration may be longer
634 /// than requested.
635 pub fn timer(&self, duration: Duration) -> Task<()> {
636 if duration.is_zero() {
637 return Task::ready(());
638 }
639 let location = core::panic::Location::caller();
640 let (runnable, task) = async_task::Builder::new()
641 .metadata(RunnableMeta {
642 location,
643 app: None,
644 })
645 .spawn(move |_| async move {}, {
646 let dispatcher = self.dispatcher.clone();
647 move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
648 });
649 runnable.schedule();
650 Task(TaskState::Spawned(task))
651 }
652
653 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
654 #[cfg(any(test, feature = "test-support"))]
655 pub fn start_waiting(&self) {
656 self.dispatcher.as_test().unwrap().start_waiting();
657 }
658
659 /// in tests, removes the debugging data added by start_waiting
660 #[cfg(any(test, feature = "test-support"))]
661 pub fn finish_waiting(&self) {
662 self.dispatcher.as_test().unwrap().finish_waiting();
663 }
664
665 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
666 #[cfg(any(test, feature = "test-support"))]
667 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
668 self.dispatcher.as_test().unwrap().simulate_random_delay()
669 }
670
671 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
672 #[cfg(any(test, feature = "test-support"))]
673 pub fn deprioritize(&self, task_label: TaskLabel) {
674 self.dispatcher.as_test().unwrap().deprioritize(task_label)
675 }
676
677 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
678 #[cfg(any(test, feature = "test-support"))]
679 pub fn advance_clock(&self, duration: Duration) {
680 self.dispatcher.as_test().unwrap().advance_clock(duration)
681 }
682
683 /// in tests, run one task.
684 #[cfg(any(test, feature = "test-support"))]
685 pub fn tick(&self) -> bool {
686 self.dispatcher.as_test().unwrap().tick(false)
687 }
688
689 /// in tests, run all tasks that are ready to run. If after doing so
690 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
691 #[cfg(any(test, feature = "test-support"))]
692 pub fn run_until_parked(&self) {
693 self.dispatcher.as_test().unwrap().run_until_parked()
694 }
695
696 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
697 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
698 /// do take real async time to run.
699 #[cfg(any(test, feature = "test-support"))]
700 pub fn allow_parking(&self) {
701 self.dispatcher.as_test().unwrap().allow_parking();
702 }
703
704 /// undoes the effect of [`Self::allow_parking`].
705 #[cfg(any(test, feature = "test-support"))]
706 pub fn forbid_parking(&self) {
707 self.dispatcher.as_test().unwrap().forbid_parking();
708 }
709
710 /// adds detail to the "parked with nothing let to run" message.
711 #[cfg(any(test, feature = "test-support"))]
712 pub fn set_waiting_hint(&self, msg: Option<String>) {
713 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
714 }
715
716 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
717 #[cfg(any(test, feature = "test-support"))]
718 pub fn rng(&self) -> StdRng {
719 self.dispatcher.as_test().unwrap().rng()
720 }
721
722 /// How many CPUs are available to the dispatcher.
723 pub fn num_cpus(&self) -> usize {
724 #[cfg(any(test, feature = "test-support"))]
725 return 4;
726
727 #[cfg(not(any(test, feature = "test-support")))]
728 return num_cpus::get();
729 }
730
731 /// Whether we're on the main thread.
732 pub fn is_main_thread(&self) -> bool {
733 self.dispatcher.is_main_thread()
734 }
735
736 #[cfg(any(test, feature = "test-support"))]
737 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
738 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
739 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
740 }
741}
742
743/// ForegroundExecutor runs things on the main thread.
744impl ForegroundExecutor {
745 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
746 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
747 Self {
748 dispatcher,
749 not_send: PhantomData,
750 }
751 }
752
753 /// Enqueues the given Task to run on the main thread at some point in the future.
754 #[track_caller]
755 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
756 where
757 R: 'static,
758 {
759 self.inner_spawn(None, Priority::default(), future)
760 }
761
762 /// Enqueues the given Task to run on the main thread at some point in the future.
763 #[track_caller]
764 pub fn spawn_with_priority<R>(
765 &self,
766 priority: Priority,
767 future: impl Future<Output = R> + 'static,
768 ) -> Task<R>
769 where
770 R: 'static,
771 {
772 self.inner_spawn(None, priority, future)
773 }
774
775 #[track_caller]
776 pub(crate) fn spawn_context<R>(
777 &self,
778 app: std::sync::Weak<()>,
779 future: impl Future<Output = R> + 'static,
780 ) -> Task<R>
781 where
782 R: 'static,
783 {
784 self.inner_spawn(Some(app), Priority::default(), future)
785 }
786
787 #[track_caller]
788 pub(crate) fn inner_spawn<R>(
789 &self,
790 app: Option<std::sync::Weak<()>>,
791 priority: Priority,
792 future: impl Future<Output = R> + 'static,
793 ) -> Task<R>
794 where
795 R: 'static,
796 {
797 let dispatcher = self.dispatcher.clone();
798 let location = core::panic::Location::caller();
799
800 #[track_caller]
801 fn inner<R: 'static>(
802 dispatcher: Arc<dyn PlatformDispatcher>,
803 future: AnyLocalFuture<R>,
804 location: &'static core::panic::Location<'static>,
805 app: Option<std::sync::Weak<()>>,
806 priority: Priority,
807 ) -> Task<R> {
808 let (runnable, task) = spawn_local_with_source_location(
809 future,
810 move |runnable| {
811 dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
812 },
813 RunnableMeta { location, app },
814 );
815 runnable.schedule();
816 Task(TaskState::Spawned(task))
817 }
818 inner::<R>(dispatcher, Box::pin(future), location, app, priority)
819 }
820}
821
822/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
823///
824/// Copy-modified from:
825/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
826#[track_caller]
827fn spawn_local_with_source_location<Fut, S, M>(
828 future: Fut,
829 schedule: S,
830 metadata: M,
831) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
832where
833 Fut: Future + 'static,
834 Fut::Output: 'static,
835 S: async_task::Schedule<M> + Send + Sync + 'static,
836 M: 'static,
837{
838 #[inline]
839 fn thread_id() -> ThreadId {
840 std::thread_local! {
841 static ID: ThreadId = thread::current().id();
842 }
843 ID.try_with(|id| *id)
844 .unwrap_or_else(|_| thread::current().id())
845 }
846
847 struct Checked<F> {
848 id: ThreadId,
849 inner: ManuallyDrop<F>,
850 location: &'static Location<'static>,
851 }
852
853 impl<F> Drop for Checked<F> {
854 fn drop(&mut self) {
855 assert!(
856 self.id == thread_id(),
857 "local task dropped by a thread that didn't spawn it. Task spawned at {}",
858 self.location
859 );
860 unsafe { ManuallyDrop::drop(&mut self.inner) };
861 }
862 }
863
864 impl<F: Future> Future for Checked<F> {
865 type Output = F::Output;
866
867 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
868 assert!(
869 self.id == thread_id(),
870 "local task polled by a thread that didn't spawn it. Task spawned at {}",
871 self.location
872 );
873 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
874 }
875 }
876
877 // Wrap the future into one that checks which thread it's on.
878 let future = Checked {
879 id: thread_id(),
880 inner: ManuallyDrop::new(future),
881 location: Location::caller(),
882 };
883
884 unsafe {
885 async_task::Builder::new()
886 .metadata(metadata)
887 .spawn_unchecked(move |_| future, schedule)
888 }
889}
890
891/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
892pub struct Scope<'a> {
893 executor: BackgroundExecutor,
894 priority: Priority,
895 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
896 tx: Option<mpsc::Sender<()>>,
897 rx: mpsc::Receiver<()>,
898 lifetime: PhantomData<&'a ()>,
899}
900
901impl<'a> Scope<'a> {
902 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
903 let (tx, rx) = mpsc::channel(1);
904 Self {
905 executor,
906 priority,
907 tx: Some(tx),
908 rx,
909 futures: Default::default(),
910 lifetime: PhantomData,
911 }
912 }
913
914 /// How many CPUs are available to the dispatcher.
915 pub fn num_cpus(&self) -> usize {
916 self.executor.num_cpus()
917 }
918
919 /// Spawn a future into this scope.
920 #[track_caller]
921 pub fn spawn<F>(&mut self, f: F)
922 where
923 F: Future<Output = ()> + Send + 'a,
924 {
925 let tx = self.tx.clone().unwrap();
926
927 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
928 // dropping this `Scope` blocks until all of the futures have resolved.
929 let f = unsafe {
930 mem::transmute::<
931 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
932 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
933 >(Box::pin(async move {
934 f.await;
935 drop(tx);
936 }))
937 };
938 self.futures.push(f);
939 }
940}
941
942impl Drop for Scope<'_> {
943 fn drop(&mut self) {
944 self.tx.take().unwrap();
945
946 // Wait until the channel is closed, which means that all of the spawned
947 // futures have resolved.
948 self.executor.block(self.rx.next());
949 }
950}
951
952#[cfg(test)]
953mod test {
954 use super::*;
955 use crate::{App, TestDispatcher, TestPlatform};
956 use rand::SeedableRng;
957 use std::cell::RefCell;
958
959 #[test]
960 fn sanity_test_tasks_run() {
961 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
962 let arc_dispatcher = Arc::new(dispatcher.clone());
963 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
964 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
965
966 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
967 let asset_source = Arc::new(());
968 let http_client = http_client::FakeHttpClient::with_404_response();
969
970 let app = App::new_app(platform, asset_source, http_client);
971 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
972
973 let task_ran = Rc::new(RefCell::new(false));
974
975 foreground_executor
976 .spawn_context(liveness_token, {
977 let task_ran = Rc::clone(&task_ran);
978 async move {
979 *task_ran.borrow_mut() = true;
980 }
981 })
982 .detach();
983
984 // Run dispatcher while app is still alive
985 dispatcher.run_until_parked();
986
987 // Task should have run
988 assert!(
989 *task_ran.borrow(),
990 "Task should run normally when app is alive"
991 );
992 }
993
994 #[test]
995 fn test_task_cancelled_when_app_dropped() {
996 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
997 let arc_dispatcher = Arc::new(dispatcher.clone());
998 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
999 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1000
1001 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1002 let asset_source = Arc::new(());
1003 let http_client = http_client::FakeHttpClient::with_404_response();
1004
1005 let app = App::new_app(platform, asset_source, http_client);
1006 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1007 let app_weak = Rc::downgrade(&app);
1008
1009 let task_ran = Rc::new(RefCell::new(false));
1010 let task_ran_clone = Rc::clone(&task_ran);
1011
1012 foreground_executor
1013 .spawn_context(liveness_token, async move {
1014 *task_ran_clone.borrow_mut() = true;
1015 })
1016 .detach();
1017
1018 drop(app);
1019
1020 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1021
1022 dispatcher.run_until_parked();
1023
1024 // The task should have been cancelled, not run
1025 assert!(
1026 !*task_ran.borrow(),
1027 "Task should have been cancelled when app was dropped, but it ran!"
1028 );
1029 }
1030
1031 #[test]
1032 fn test_nested_tasks_both_cancel() {
1033 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1034 let arc_dispatcher = Arc::new(dispatcher.clone());
1035 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1036 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1037
1038 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1039 let asset_source = Arc::new(());
1040 let http_client = http_client::FakeHttpClient::with_404_response();
1041
1042 let app = App::new_app(platform, asset_source, http_client);
1043 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1044 let app_weak = Rc::downgrade(&app);
1045
1046 let outer_completed = Rc::new(RefCell::new(false));
1047 let inner_completed = Rc::new(RefCell::new(false));
1048 let reached_await = Rc::new(RefCell::new(false));
1049
1050 let outer_flag = Rc::clone(&outer_completed);
1051 let inner_flag = Rc::clone(&inner_completed);
1052 let await_flag = Rc::clone(&reached_await);
1053
1054 // Channel to block the inner task until we're ready
1055 let (tx, rx) = futures::channel::oneshot::channel::<()>();
1056
1057 // We need clones of executor and liveness_token for the inner spawn
1058 let inner_executor = foreground_executor.clone();
1059 let inner_liveness_token = liveness_token.clone();
1060
1061 foreground_executor
1062 .spawn_context(liveness_token, async move {
1063 let inner_task = inner_executor.spawn_context(inner_liveness_token, {
1064 let inner_flag = Rc::clone(&inner_flag);
1065 async move {
1066 rx.await.ok();
1067 *inner_flag.borrow_mut() = true;
1068 }
1069 });
1070
1071 *await_flag.borrow_mut() = true;
1072
1073 inner_task.await;
1074
1075 *outer_flag.borrow_mut() = true;
1076 })
1077 .detach();
1078
1079 // Run dispatcher until outer task reaches the await point
1080 // The inner task will be blocked on the channel
1081 dispatcher.run_until_parked();
1082
1083 // Verify we actually reached the await point before dropping the app
1084 assert!(
1085 *reached_await.borrow(),
1086 "Outer task should have reached the await point"
1087 );
1088
1089 // Neither task should have completed yet
1090 assert!(
1091 !*outer_completed.borrow(),
1092 "Outer task should not have completed yet"
1093 );
1094 assert!(
1095 !*inner_completed.borrow(),
1096 "Inner task should not have completed yet"
1097 );
1098
1099 // Drop the channel sender and app while outer is awaiting inner
1100 drop(tx);
1101 drop(app);
1102 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1103
1104 // Run dispatcher - both tasks should be cancelled
1105 dispatcher.run_until_parked();
1106
1107 // Neither task should have completed (both were cancelled)
1108 assert!(
1109 !*outer_completed.borrow(),
1110 "Outer task should have been cancelled, not completed"
1111 );
1112 assert!(
1113 !*inner_completed.borrow(),
1114 "Inner task should have been cancelled, not completed"
1115 );
1116 }
1117
1118 #[test]
1119 fn test_task_without_app_tracking_still_runs() {
1120 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1121 let arc_dispatcher = Arc::new(dispatcher.clone());
1122 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1123 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1124
1125 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1126 let asset_source = Arc::new(());
1127 let http_client = http_client::FakeHttpClient::with_404_response();
1128
1129 let app = App::new_app(platform, asset_source, http_client);
1130 let app_weak = Rc::downgrade(&app);
1131
1132 let task_ran = Rc::new(RefCell::new(false));
1133 let task_ran_clone = Rc::clone(&task_ran);
1134
1135 let _task = foreground_executor.spawn(async move {
1136 *task_ran_clone.borrow_mut() = true;
1137 });
1138
1139 drop(app);
1140
1141 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1142
1143 dispatcher.run_until_parked();
1144
1145 assert!(
1146 *task_ran.borrow(),
1147 "Task without app tracking should still run after app is dropped"
1148 );
1149 }
1150
1151 #[test]
1152 #[should_panic]
1153 fn test_polling_cancelled_task_panics() {
1154 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1155 let arc_dispatcher = Arc::new(dispatcher.clone());
1156 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1157 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1158
1159 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1160 let asset_source = Arc::new(());
1161 let http_client = http_client::FakeHttpClient::with_404_response();
1162
1163 let app = App::new_app(platform, asset_source, http_client);
1164 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1165 let app_weak = Rc::downgrade(&app);
1166
1167 let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1168
1169 drop(app);
1170
1171 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1172
1173 dispatcher.run_until_parked();
1174
1175 background_executor.block(task);
1176 }
1177
1178 #[test]
1179 fn test_polling_cancelled_task_returns_none_with_fallible() {
1180 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1181 let arc_dispatcher = Arc::new(dispatcher.clone());
1182 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1183 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1184
1185 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1186 let asset_source = Arc::new(());
1187 let http_client = http_client::FakeHttpClient::with_404_response();
1188
1189 let app = App::new_app(platform, asset_source, http_client);
1190 let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1191 let app_weak = Rc::downgrade(&app);
1192
1193 let task = foreground_executor
1194 .spawn_context(liveness_token, async move { 42 })
1195 .fallible();
1196
1197 drop(app);
1198
1199 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1200
1201 dispatcher.run_until_parked();
1202
1203 let result = background_executor.block(task);
1204 assert_eq!(result, None, "Cancelled task should return None");
1205 }
1206}