1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use parking_lot::{Condvar, Mutex};
5use smol::prelude::*;
6use std::{
7 fmt::Debug,
8 marker::PhantomData,
9 mem::{self, ManuallyDrop},
10 num::NonZeroUsize,
11 panic::Location,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 Arc,
16 atomic::{AtomicUsize, Ordering},
17 },
18 task::{Context, Poll},
19 thread::{self, ThreadId},
20 time::{Duration, Instant},
21};
22use util::TryFutureExt;
23use waker_fn::waker_fn;
24
25#[cfg(any(test, feature = "test-support"))]
26use rand::rngs::StdRng;
27
28/// A pointer to the executor that is currently running,
29/// for spawning background tasks.
30#[derive(Clone)]
31pub struct BackgroundExecutor {
32 #[doc(hidden)]
33 pub dispatcher: Arc<dyn PlatformDispatcher>,
34}
35
36/// A pointer to the executor that is currently running,
37/// for spawning tasks on the main thread.
38///
39/// This is intentionally `!Send` via the `not_send` marker field. This is because
40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
41/// only polled from the same thread it was spawned from. These checks would fail when spawning
42/// foreground tasks from background threads.
43#[derive(Clone)]
44pub struct ForegroundExecutor {
45 #[doc(hidden)]
46 pub dispatcher: Arc<dyn PlatformDispatcher>,
47 not_send: PhantomData<Rc<()>>,
48}
49
50/// Realtime task priority
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52#[repr(u8)]
53pub enum RealtimePriority {
54 /// Audio task
55 Audio,
56 /// Other realtime task
57 #[default]
58 Other,
59}
60
61/// Task priority
62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
63#[repr(u8)]
64pub enum Priority {
65 /// Realtime priority
66 ///
67 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
68 Realtime(RealtimePriority),
69 /// High priority
70 ///
71 /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
72 High,
73 /// Medium priority, probably suits most of your use cases.
74 #[default]
75 Medium,
76 /// Low priority
77 ///
78 /// Prioritize this for background work that can come in large quantities
79 /// to not starve the executor of resources for high priority tasks
80 Low,
81}
82
83impl Priority {
84 #[allow(dead_code)]
85 pub(crate) const fn probability(&self) -> u32 {
86 match self {
87 // realtime priorities are not considered for probability scheduling
88 Priority::Realtime(_) => 0,
89 Priority::High => 60,
90 Priority::Medium => 30,
91 Priority::Low => 10,
92 }
93 }
94}
95
96/// Task is a primitive that allows work to happen in the background.
97///
98/// It implements [`Future`] so you can `.await` on it.
99///
100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
101/// the task to continue running, but with no way to return a value.
102#[must_use]
103#[derive(Debug)]
104pub struct Task<T>(TaskState<T>);
105
106#[derive(Debug)]
107enum TaskState<T> {
108 /// A task that is ready to return a value
109 Ready(Option<T>),
110
111 /// A task that is currently running.
112 Spawned(async_task::Task<T, RunnableMeta>),
113}
114
115impl<T> Task<T> {
116 /// Creates a new task that will resolve with the value
117 pub fn ready(val: T) -> Self {
118 Task(TaskState::Ready(Some(val)))
119 }
120
121 /// Detaching a task runs it to completion in the background
122 pub fn detach(self) {
123 match self {
124 Task(TaskState::Ready(_)) => {}
125 Task(TaskState::Spawned(task)) => task.detach(),
126 }
127 }
128
129 /// Converts this task into a fallible task that returns `Option<T>`.
130 ///
131 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
132 /// if the app was dropped while the task is executing.
133 ///
134 /// # Example
135 ///
136 /// ```ignore
137 /// // Background task that gracefully handles app shutdown:
138 /// cx.background_spawn(async move {
139 /// let result = foreground_task.fallible().await;
140 /// if let Some(value) = result {
141 /// // Process the value
142 /// }
143 /// // If None, app was shut down - just exit gracefully
144 /// }).detach();
145 /// ```
146 pub fn fallible(self) -> FallibleTask<T> {
147 FallibleTask(match self.0 {
148 TaskState::Ready(val) => FallibleTaskState::Ready(val),
149 TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
150 })
151 }
152}
153
154impl<E, T> Task<Result<T, E>>
155where
156 T: 'static,
157 E: 'static + Debug,
158{
159 /// Run the task to completion in the background and log any
160 /// errors that occur.
161 #[track_caller]
162 pub fn detach_and_log_err(self, cx: &App) {
163 let location = core::panic::Location::caller();
164 cx.foreground_executor()
165 .spawn(self.log_tracked_err(*location))
166 .detach();
167 }
168}
169
170impl<T> Future for Task<T> {
171 type Output = T;
172
173 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
174 match unsafe { self.get_unchecked_mut() } {
175 Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
176 Task(TaskState::Spawned(task)) => task.poll(cx),
177 }
178 }
179}
180
181/// A task that returns `Option<T>` instead of panicking when cancelled.
182#[must_use]
183pub struct FallibleTask<T>(FallibleTaskState<T>);
184
185enum FallibleTaskState<T> {
186 /// A task that is ready to return a value
187 Ready(Option<T>),
188
189 /// A task that is currently running (wraps async_task::FallibleTask).
190 Spawned(async_task::FallibleTask<T, RunnableMeta>),
191}
192
193impl<T> FallibleTask<T> {
194 /// Creates a new fallible task that will resolve with the value.
195 pub fn ready(val: T) -> Self {
196 FallibleTask(FallibleTaskState::Ready(Some(val)))
197 }
198
199 /// Detaching a task runs it to completion in the background.
200 pub fn detach(self) {
201 match self.0 {
202 FallibleTaskState::Ready(_) => {}
203 FallibleTaskState::Spawned(task) => task.detach(),
204 }
205 }
206}
207
208impl<T> Future for FallibleTask<T> {
209 type Output = Option<T>;
210
211 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
212 match unsafe { self.get_unchecked_mut() } {
213 FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
214 FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
215 }
216 }
217}
218
219impl<T> std::fmt::Debug for FallibleTask<T> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match &self.0 {
222 FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
223 FallibleTaskState::Spawned(task) => {
224 f.debug_tuple("FallibleTask::Spawned").field(task).finish()
225 }
226 }
227 }
228}
229
230/// A task label is an opaque identifier that you can use to
231/// refer to a task in tests.
232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
233pub struct TaskLabel(NonZeroUsize);
234
235impl Default for TaskLabel {
236 fn default() -> Self {
237 Self::new()
238 }
239}
240
241impl TaskLabel {
242 /// Construct a new task label.
243 pub fn new() -> Self {
244 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
245 Self(
246 NEXT_TASK_LABEL
247 .fetch_add(1, Ordering::SeqCst)
248 .try_into()
249 .unwrap(),
250 )
251 }
252}
253
254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
255
256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
257
258/// BackgroundExecutor lets you run things on background threads.
259/// In production this is a thread pool with no ordering guarantees.
260/// In tests this is simulated by running tasks one by one in a deterministic
261/// (but arbitrary) order controlled by the `SEED` environment variable.
262impl BackgroundExecutor {
263 #[doc(hidden)]
264 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
265 Self { dispatcher }
266 }
267
268 /// Enqueues the given future to be run to completion on a background thread.
269 #[track_caller]
270 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
271 where
272 R: Send + 'static,
273 {
274 self.spawn_with_priority(Priority::default(), future)
275 }
276
277 /// Enqueues the given future to be run to completion on a background thread.
278 #[track_caller]
279 pub fn spawn_with_priority<R>(
280 &self,
281 priority: Priority,
282 future: impl Future<Output = R> + Send + 'static,
283 ) -> Task<R>
284 where
285 R: Send + 'static,
286 {
287 self.spawn_internal::<R>(Box::pin(future), None, priority)
288 }
289
290 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
291 ///
292 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
293 /// completion before the current task is resumed, even if the current task is slated for cancellation.
294 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
295 where
296 R: Send,
297 {
298 // We need to ensure that cancellation of the parent task does not drop the environment
299 // before the our own task has completed or got cancelled.
300 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
301
302 impl Drop for NotifyOnDrop<'_> {
303 fn drop(&mut self) {
304 *self.0.1.lock() = true;
305 self.0.0.notify_all();
306 }
307 }
308
309 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
310
311 impl Drop for WaitOnDrop<'_> {
312 fn drop(&mut self) {
313 let mut done = self.0.1.lock();
314 if !*done {
315 self.0.0.wait(&mut done);
316 }
317 }
318 }
319
320 let dispatcher = self.dispatcher.clone();
321 let location = core::panic::Location::caller();
322
323 let pair = &(Condvar::new(), Mutex::new(false));
324 let _wait_guard = WaitOnDrop(pair);
325
326 let (runnable, task) = unsafe {
327 async_task::Builder::new()
328 .metadata(RunnableMeta {
329 location,
330 app: None,
331 })
332 .spawn_unchecked(
333 move |_| async {
334 let _notify_guard = NotifyOnDrop(pair);
335 future.await
336 },
337 move |runnable| {
338 dispatcher.dispatch(
339 RunnableVariant::Meta(runnable),
340 None,
341 Priority::default(),
342 )
343 },
344 )
345 };
346 runnable.schedule();
347 task.await
348 }
349
350 /// Enqueues the given future to be run to completion on a background thread.
351 /// The given label can be used to control the priority of the task in tests.
352 #[track_caller]
353 pub fn spawn_labeled<R>(
354 &self,
355 label: TaskLabel,
356 future: impl Future<Output = R> + Send + 'static,
357 ) -> Task<R>
358 where
359 R: Send + 'static,
360 {
361 self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
362 }
363
364 #[track_caller]
365 fn spawn_internal<R: Send + 'static>(
366 &self,
367 future: AnyFuture<R>,
368 label: Option<TaskLabel>,
369 #[cfg_attr(
370 target_os = "windows",
371 expect(
372 unused_variables,
373 reason = "Multi priority scheduler is broken on windows"
374 )
375 )]
376 priority: Priority,
377 ) -> Task<R> {
378 let dispatcher = self.dispatcher.clone();
379 #[cfg(target_os = "windows")]
380 let priority = Priority::Medium; // multi-prio scheduler is broken on windows
381
382 let (runnable, task) = if let Priority::Realtime(realtime) = priority {
383 let location = core::panic::Location::caller();
384 let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
385
386 dispatcher.spawn_realtime(
387 realtime,
388 Box::new(move || {
389 while let Ok(runnable) = rx.recv() {
390 let start = Instant::now();
391 let location = runnable.metadata().location;
392 let mut timing = TaskTiming {
393 location,
394 start,
395 end: None,
396 };
397 profiler::add_task_timing(timing);
398
399 runnable.run();
400
401 let end = Instant::now();
402 timing.end = Some(end);
403 profiler::add_task_timing(timing);
404 }
405 }),
406 );
407
408 async_task::Builder::new()
409 .metadata(RunnableMeta {
410 location,
411 app: None,
412 })
413 .spawn(
414 move |_| future,
415 move |runnable| {
416 let _ = tx.send(runnable);
417 },
418 )
419 } else {
420 let location = core::panic::Location::caller();
421 async_task::Builder::new()
422 .metadata(RunnableMeta {
423 location,
424 app: None,
425 })
426 .spawn(
427 move |_| future,
428 move |runnable| {
429 dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
430 },
431 )
432 };
433
434 runnable.schedule();
435 Task(TaskState::Spawned(task))
436 }
437
438 /// Used by the test harness to run an async test in a synchronous fashion.
439 #[cfg(any(test, feature = "test-support"))]
440 #[track_caller]
441 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
442 if let Ok(value) = self.block_internal(false, future, None) {
443 value
444 } else {
445 unreachable!()
446 }
447 }
448
449 /// Block the current thread until the given future resolves.
450 /// Consider using `block_with_timeout` instead.
451 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
452 if let Ok(value) = self.block_internal(true, future, None) {
453 value
454 } else {
455 unreachable!()
456 }
457 }
458
459 #[cfg(not(any(test, feature = "test-support")))]
460 pub(crate) fn block_internal<Fut: Future>(
461 &self,
462 _background_only: bool,
463 future: Fut,
464 timeout: Option<Duration>,
465 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
466 use std::time::Instant;
467
468 let mut future = Box::pin(future);
469 if timeout == Some(Duration::ZERO) {
470 return Err(future);
471 }
472 let deadline = timeout.map(|timeout| Instant::now() + timeout);
473
474 let parker = parking::Parker::new();
475 let unparker = parker.unparker();
476 let waker = waker_fn(move || {
477 unparker.unpark();
478 });
479 let mut cx = std::task::Context::from_waker(&waker);
480
481 loop {
482 match future.as_mut().poll(&mut cx) {
483 Poll::Ready(result) => return Ok(result),
484 Poll::Pending => {
485 let timeout =
486 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
487 if let Some(timeout) = timeout {
488 if !parker.park_timeout(timeout)
489 && deadline.is_some_and(|deadline| deadline < Instant::now())
490 {
491 return Err(future);
492 }
493 } else {
494 parker.park();
495 }
496 }
497 }
498 }
499 }
500
501 #[cfg(any(test, feature = "test-support"))]
502 #[track_caller]
503 pub(crate) fn block_internal<Fut: Future>(
504 &self,
505 background_only: bool,
506 future: Fut,
507 timeout: Option<Duration>,
508 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
509 use std::sync::atomic::AtomicBool;
510
511 use parking::Parker;
512
513 let mut future = Box::pin(future);
514 if timeout == Some(Duration::ZERO) {
515 return Err(future);
516 }
517 let Some(dispatcher) = self.dispatcher.as_test() else {
518 return Err(future);
519 };
520
521 let mut max_ticks = if timeout.is_some() {
522 dispatcher.gen_block_on_ticks()
523 } else {
524 usize::MAX
525 };
526
527 let parker = Parker::new();
528 let unparker = parker.unparker();
529
530 let awoken = Arc::new(AtomicBool::new(false));
531 let waker = waker_fn({
532 let awoken = awoken.clone();
533 let unparker = unparker.clone();
534 move || {
535 awoken.store(true, Ordering::SeqCst);
536 unparker.unpark();
537 }
538 });
539 let mut cx = std::task::Context::from_waker(&waker);
540
541 let duration = Duration::from_secs(
542 option_env!("GPUI_TEST_TIMEOUT")
543 .and_then(|s| s.parse::<u64>().ok())
544 .unwrap_or(180),
545 );
546 let mut test_should_end_by = Instant::now() + duration;
547
548 loop {
549 match future.as_mut().poll(&mut cx) {
550 Poll::Ready(result) => return Ok(result),
551 Poll::Pending => {
552 if max_ticks == 0 {
553 return Err(future);
554 }
555 max_ticks -= 1;
556
557 if !dispatcher.tick(background_only) {
558 if awoken.swap(false, Ordering::SeqCst) {
559 continue;
560 }
561
562 if !dispatcher.parking_allowed() {
563 if dispatcher.advance_clock_to_next_delayed() {
564 continue;
565 }
566 let mut backtrace_message = String::new();
567 let mut waiting_message = String::new();
568 if let Some(backtrace) = dispatcher.waiting_backtrace() {
569 backtrace_message =
570 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
571 }
572 if let Some(waiting_hint) = dispatcher.waiting_hint() {
573 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
574 }
575 panic!(
576 "parked with nothing left to run{waiting_message}{backtrace_message}",
577 )
578 }
579 dispatcher.push_unparker(unparker.clone());
580 parker.park_timeout(Duration::from_millis(1));
581 if Instant::now() > test_should_end_by {
582 panic!("test timed out after {duration:?} with allow_parking")
583 }
584 }
585 }
586 }
587 }
588 }
589
590 /// Block the current thread until the given future resolves
591 /// or `duration` has elapsed.
592 pub fn block_with_timeout<Fut: Future>(
593 &self,
594 duration: Duration,
595 future: Fut,
596 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
597 self.block_internal(true, future, Some(duration))
598 }
599
600 /// Scoped lets you start a number of tasks and waits
601 /// for all of them to complete before returning.
602 pub async fn scoped<'scope, F>(&self, scheduler: F)
603 where
604 F: FnOnce(&mut Scope<'scope>),
605 {
606 let mut scope = Scope::new(self.clone(), Priority::default());
607 (scheduler)(&mut scope);
608 let spawned = mem::take(&mut scope.futures)
609 .into_iter()
610 .map(|f| self.spawn_with_priority(scope.priority, f))
611 .collect::<Vec<_>>();
612 for task in spawned {
613 task.await;
614 }
615 }
616
617 /// Scoped lets you start a number of tasks and waits
618 /// for all of them to complete before returning.
619 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
620 where
621 F: FnOnce(&mut Scope<'scope>),
622 {
623 let mut scope = Scope::new(self.clone(), priority);
624 (scheduler)(&mut scope);
625 let spawned = mem::take(&mut scope.futures)
626 .into_iter()
627 .map(|f| self.spawn_with_priority(scope.priority, f))
628 .collect::<Vec<_>>();
629 for task in spawned {
630 task.await;
631 }
632 }
633
634 /// Get the current time.
635 ///
636 /// Calling this instead of `std::time::Instant::now` allows the use
637 /// of fake timers in tests.
638 pub fn now(&self) -> Instant {
639 self.dispatcher.now()
640 }
641
642 /// Returns a task that will complete after the given duration.
643 /// Depending on other concurrent tasks the elapsed duration may be longer
644 /// than requested.
645 pub fn timer(&self, duration: Duration) -> Task<()> {
646 if duration.is_zero() {
647 return Task::ready(());
648 }
649 let location = core::panic::Location::caller();
650 let (runnable, task) = async_task::Builder::new()
651 .metadata(RunnableMeta {
652 location,
653 app: None,
654 })
655 .spawn(move |_| async move {}, {
656 let dispatcher = self.dispatcher.clone();
657 move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
658 });
659 runnable.schedule();
660 Task(TaskState::Spawned(task))
661 }
662
663 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
664 #[cfg(any(test, feature = "test-support"))]
665 pub fn start_waiting(&self) {
666 self.dispatcher.as_test().unwrap().start_waiting();
667 }
668
669 /// in tests, removes the debugging data added by start_waiting
670 #[cfg(any(test, feature = "test-support"))]
671 pub fn finish_waiting(&self) {
672 self.dispatcher.as_test().unwrap().finish_waiting();
673 }
674
675 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
676 #[cfg(any(test, feature = "test-support"))]
677 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
678 self.dispatcher.as_test().unwrap().simulate_random_delay()
679 }
680
681 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
682 #[cfg(any(test, feature = "test-support"))]
683 pub fn deprioritize(&self, task_label: TaskLabel) {
684 self.dispatcher.as_test().unwrap().deprioritize(task_label)
685 }
686
687 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
688 #[cfg(any(test, feature = "test-support"))]
689 pub fn advance_clock(&self, duration: Duration) {
690 self.dispatcher.as_test().unwrap().advance_clock(duration)
691 }
692
693 /// in tests, run one task.
694 #[cfg(any(test, feature = "test-support"))]
695 pub fn tick(&self) -> bool {
696 self.dispatcher.as_test().unwrap().tick(false)
697 }
698
699 /// in tests, run all tasks that are ready to run. If after doing so
700 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
701 #[cfg(any(test, feature = "test-support"))]
702 pub fn run_until_parked(&self) {
703 self.dispatcher.as_test().unwrap().run_until_parked()
704 }
705
706 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
707 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
708 /// do take real async time to run.
709 #[cfg(any(test, feature = "test-support"))]
710 pub fn allow_parking(&self) {
711 self.dispatcher.as_test().unwrap().allow_parking();
712 }
713
714 /// undoes the effect of [`Self::allow_parking`].
715 #[cfg(any(test, feature = "test-support"))]
716 pub fn forbid_parking(&self) {
717 self.dispatcher.as_test().unwrap().forbid_parking();
718 }
719
720 /// adds detail to the "parked with nothing let to run" message.
721 #[cfg(any(test, feature = "test-support"))]
722 pub fn set_waiting_hint(&self, msg: Option<String>) {
723 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
724 }
725
726 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
727 #[cfg(any(test, feature = "test-support"))]
728 pub fn rng(&self) -> StdRng {
729 self.dispatcher.as_test().unwrap().rng()
730 }
731
732 /// How many CPUs are available to the dispatcher.
733 pub fn num_cpus(&self) -> usize {
734 #[cfg(any(test, feature = "test-support"))]
735 return 4;
736
737 #[cfg(not(any(test, feature = "test-support")))]
738 return num_cpus::get();
739 }
740
741 /// Whether we're on the main thread.
742 pub fn is_main_thread(&self) -> bool {
743 self.dispatcher.is_main_thread()
744 }
745
746 #[cfg(any(test, feature = "test-support"))]
747 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
748 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
749 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
750 }
751}
752
753/// ForegroundExecutor runs things on the main thread.
754impl ForegroundExecutor {
755 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
756 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
757 Self {
758 dispatcher,
759 not_send: PhantomData,
760 }
761 }
762
763 /// Enqueues the given Task to run on the main thread at some point in the future.
764 #[track_caller]
765 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
766 where
767 R: 'static,
768 {
769 self.spawn_with_app_and_priority(None, Priority::default(), future)
770 }
771
772 /// Enqueues the given Task to run on the main thread at some point in the future.
773 #[track_caller]
774 pub fn spawn_with_priority<R>(
775 &self,
776 priority: Priority,
777 future: impl Future<Output = R> + 'static,
778 ) -> Task<R>
779 where
780 R: 'static,
781 {
782 self.spawn_with_app_and_priority(None, priority, future)
783 }
784
785 /// Enqueues the given Task to run on the main thread at some point in the future,
786 /// with a weak reference to the app for cancellation checking.
787 ///
788 /// When the app is dropped, pending tasks spawned with this method will be cancelled
789 /// before they run, rather than panicking when they try to access the dropped app.
790 #[track_caller]
791 pub fn spawn_with_app<R>(
792 &self,
793 app: std::rc::Weak<crate::AppCell>,
794 future: impl Future<Output = R> + 'static,
795 ) -> Task<R>
796 where
797 R: 'static,
798 {
799 self.spawn_with_app_and_priority(Some(app), Priority::default(), future)
800 }
801
802 /// Enqueues the given Task to run on the main thread at some point in the future,
803 /// with an optional weak reference to the app for cancellation checking and a specific priority.
804 #[track_caller]
805 pub fn spawn_with_app_and_priority<R>(
806 &self,
807 app: Option<std::rc::Weak<crate::AppCell>>,
808 priority: Priority,
809 future: impl Future<Output = R> + 'static,
810 ) -> Task<R>
811 where
812 R: 'static,
813 {
814 let dispatcher = self.dispatcher.clone();
815 let location = core::panic::Location::caller();
816
817 #[track_caller]
818 fn inner<R: 'static>(
819 dispatcher: Arc<dyn PlatformDispatcher>,
820 future: AnyLocalFuture<R>,
821 location: &'static core::panic::Location<'static>,
822 app: Option<std::rc::Weak<crate::AppCell>>,
823 priority: Priority,
824 ) -> Task<R> {
825 // SAFETY: We are on the main thread (ForegroundExecutor is !Send), and the
826 // MainThreadWeak will only be accessed on the main thread in the trampoline.
827 let app_weak = app.map(|weak| unsafe { crate::MainThreadWeak::new(weak) });
828 let (runnable, task) = spawn_local_with_source_location(
829 future,
830 move |runnable| {
831 dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
832 },
833 RunnableMeta {
834 location,
835 app: app_weak,
836 },
837 );
838 runnable.schedule();
839 Task(TaskState::Spawned(task))
840 }
841 inner::<R>(dispatcher, Box::pin(future), location, app, priority)
842 }
843}
844
845/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
846///
847/// Copy-modified from:
848/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
849#[track_caller]
850fn spawn_local_with_source_location<Fut, S, M>(
851 future: Fut,
852 schedule: S,
853 metadata: M,
854) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
855where
856 Fut: Future + 'static,
857 Fut::Output: 'static,
858 S: async_task::Schedule<M> + Send + Sync + 'static,
859 M: 'static,
860{
861 #[inline]
862 fn thread_id() -> ThreadId {
863 std::thread_local! {
864 static ID: ThreadId = thread::current().id();
865 }
866 ID.try_with(|id| *id)
867 .unwrap_or_else(|_| thread::current().id())
868 }
869
870 struct Checked<F> {
871 id: ThreadId,
872 inner: ManuallyDrop<F>,
873 location: &'static Location<'static>,
874 }
875
876 impl<F> Drop for Checked<F> {
877 fn drop(&mut self) {
878 assert!(
879 self.id == thread_id(),
880 "local task dropped by a thread that didn't spawn it. Task spawned at {}",
881 self.location
882 );
883 unsafe { ManuallyDrop::drop(&mut self.inner) };
884 }
885 }
886
887 impl<F: Future> Future for Checked<F> {
888 type Output = F::Output;
889
890 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
891 assert!(
892 self.id == thread_id(),
893 "local task polled by a thread that didn't spawn it. Task spawned at {}",
894 self.location
895 );
896 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
897 }
898 }
899
900 // Wrap the future into one that checks which thread it's on.
901 let future = Checked {
902 id: thread_id(),
903 inner: ManuallyDrop::new(future),
904 location: Location::caller(),
905 };
906
907 unsafe {
908 async_task::Builder::new()
909 .metadata(metadata)
910 .spawn_unchecked(move |_| future, schedule)
911 }
912}
913
914/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
915pub struct Scope<'a> {
916 executor: BackgroundExecutor,
917 priority: Priority,
918 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
919 tx: Option<mpsc::Sender<()>>,
920 rx: mpsc::Receiver<()>,
921 lifetime: PhantomData<&'a ()>,
922}
923
924impl<'a> Scope<'a> {
925 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
926 let (tx, rx) = mpsc::channel(1);
927 Self {
928 executor,
929 priority,
930 tx: Some(tx),
931 rx,
932 futures: Default::default(),
933 lifetime: PhantomData,
934 }
935 }
936
937 /// How many CPUs are available to the dispatcher.
938 pub fn num_cpus(&self) -> usize {
939 self.executor.num_cpus()
940 }
941
942 /// Spawn a future into this scope.
943 #[track_caller]
944 pub fn spawn<F>(&mut self, f: F)
945 where
946 F: Future<Output = ()> + Send + 'a,
947 {
948 let tx = self.tx.clone().unwrap();
949
950 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
951 // dropping this `Scope` blocks until all of the futures have resolved.
952 let f = unsafe {
953 mem::transmute::<
954 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
955 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
956 >(Box::pin(async move {
957 f.await;
958 drop(tx);
959 }))
960 };
961 self.futures.push(f);
962 }
963}
964
965impl Drop for Scope<'_> {
966 fn drop(&mut self) {
967 self.tx.take().unwrap();
968
969 // Wait until the channel is closed, which means that all of the spawned
970 // futures have resolved.
971 self.executor.block(self.rx.next());
972 }
973}
974
975#[cfg(test)]
976mod test {
977 use super::*;
978 use crate::{App, TestDispatcher, TestPlatform};
979 use rand::SeedableRng;
980 use std::{cell::RefCell, rc::Weak};
981
982 #[test]
983 fn sanity_test_tasks_run() {
984 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
985 let arc_dispatcher = Arc::new(dispatcher.clone());
986 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
987 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
988
989 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
990 let asset_source = Arc::new(());
991 let http_client = http_client::FakeHttpClient::with_404_response();
992
993 let app = App::new_app(platform, asset_source, http_client);
994
995 let task_ran = Rc::new(RefCell::new(false));
996
997 foreground_executor
998 .spawn_with_app(Rc::downgrade(&app), {
999 let task_ran = Rc::clone(&task_ran);
1000 async move {
1001 *task_ran.borrow_mut() = true;
1002 }
1003 })
1004 .detach();
1005
1006 // Run dispatcher while app is still alive
1007 dispatcher.run_until_parked();
1008
1009 // Task should have run
1010 assert!(
1011 *task_ran.borrow(),
1012 "Task should run normally when app is alive"
1013 );
1014 }
1015
1016 #[test]
1017 fn test_task_cancelled_when_app_dropped() {
1018 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1019 let arc_dispatcher = Arc::new(dispatcher.clone());
1020 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1021 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1022
1023 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1024 let asset_source = Arc::new(());
1025 let http_client = http_client::FakeHttpClient::with_404_response();
1026
1027 let app = App::new_app(platform, asset_source, http_client);
1028 let app_weak = Rc::downgrade(&app);
1029
1030 let task_ran = Rc::new(RefCell::new(false));
1031 let task_ran_clone = Rc::clone(&task_ran);
1032
1033 foreground_executor
1034 .spawn_with_app(Weak::clone(&app_weak), async move {
1035 *task_ran_clone.borrow_mut() = true;
1036 })
1037 .detach();
1038
1039 assert!(
1040 Rc::weak_count(&app) > 0,
1041 "Task should hold a weak reference"
1042 );
1043
1044 drop(app);
1045
1046 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1047
1048 dispatcher.run_until_parked();
1049
1050 // The task should have been cancelled, not run
1051 assert!(
1052 !*task_ran.borrow(),
1053 "Task should have been cancelled when app was dropped, but it ran!"
1054 );
1055 }
1056
1057 #[test]
1058 fn test_nested_tasks_both_cancel() {
1059 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1060 let arc_dispatcher = Arc::new(dispatcher.clone());
1061 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1062 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1063
1064 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1065 let asset_source = Arc::new(());
1066 let http_client = http_client::FakeHttpClient::with_404_response();
1067
1068 let app = App::new_app(platform, asset_source, http_client);
1069 let app_weak = Rc::downgrade(&app);
1070
1071 let outer_completed = Rc::new(RefCell::new(false));
1072 let inner_completed = Rc::new(RefCell::new(false));
1073 let reached_await = Rc::new(RefCell::new(false));
1074
1075 let outer_flag = Rc::clone(&outer_completed);
1076 let inner_flag = Rc::clone(&inner_completed);
1077 let await_flag = Rc::clone(&reached_await);
1078
1079 // Channel to block the inner task until we're ready
1080 let (tx, rx) = futures::channel::oneshot::channel::<()>();
1081
1082 // We need clones of executor and app_weak for the inner spawn
1083 let inner_executor = foreground_executor.clone();
1084 let inner_app_weak = app_weak.clone();
1085
1086 // Spawn outer task that will spawn and await an inner task
1087 foreground_executor
1088 .spawn_with_app(Weak::clone(&app_weak), async move {
1089 let inner_flag_clone = Rc::clone(&inner_flag);
1090
1091 // Spawn inner task that blocks on a channel
1092 let inner_task = inner_executor.spawn_with_app(inner_app_weak, async move {
1093 // Wait for signal (which will never come - we'll drop the app instead)
1094 rx.await.ok();
1095 *inner_flag_clone.borrow_mut() = true;
1096 });
1097
1098 // Mark that we've reached the await point
1099 *await_flag.borrow_mut() = true;
1100
1101 // Await inner task - this should not panic when both are cancelled
1102 inner_task.await;
1103
1104 // Mark outer as complete (should never reach here)
1105 *outer_flag.borrow_mut() = true;
1106 })
1107 .detach();
1108
1109 // Run dispatcher until outer task reaches the await point
1110 // The inner task will be blocked on the channel
1111 dispatcher.run_until_parked();
1112
1113 // Verify we actually reached the await point before dropping the app
1114 assert!(
1115 *reached_await.borrow(),
1116 "Outer task should have reached the await point"
1117 );
1118
1119 // Neither task should have completed yet
1120 assert!(
1121 !*outer_completed.borrow(),
1122 "Outer task should not have completed yet"
1123 );
1124 assert!(
1125 !*inner_completed.borrow(),
1126 "Inner task should not have completed yet"
1127 );
1128
1129 // Drop the channel sender and app while outer is awaiting inner
1130 drop(tx);
1131 drop(app);
1132 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1133
1134 // Run dispatcher - both tasks should be cancelled
1135 dispatcher.run_until_parked();
1136
1137 // Neither task should have completed (both were cancelled)
1138 assert!(
1139 !*outer_completed.borrow(),
1140 "Outer task should have been cancelled, not completed"
1141 );
1142 assert!(
1143 !*inner_completed.borrow(),
1144 "Inner task should have been cancelled, not completed"
1145 );
1146 }
1147
1148 #[test]
1149 fn test_task_without_app_tracking_still_runs() {
1150 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1151 let arc_dispatcher = Arc::new(dispatcher.clone());
1152 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1153 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1154
1155 let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1156 let asset_source = Arc::new(());
1157 let http_client = http_client::FakeHttpClient::with_404_response();
1158
1159 let app = App::new_app(platform, asset_source, http_client);
1160 let app_weak = Rc::downgrade(&app);
1161
1162 let task_ran = Rc::new(RefCell::new(false));
1163 let task_ran_clone = Rc::clone(&task_ran);
1164
1165 let _task = foreground_executor.spawn(async move {
1166 *task_ran_clone.borrow_mut() = true;
1167 });
1168
1169 drop(app);
1170
1171 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1172
1173 dispatcher.run_until_parked();
1174
1175 assert!(
1176 *task_ran.borrow(),
1177 "Task without app tracking should still run after app is dropped"
1178 );
1179 }
1180
1181 #[test]
1182 #[should_panic]
1183 fn test_polling_cancelled_task_panics() {
1184 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1185 let arc_dispatcher = Arc::new(dispatcher.clone());
1186 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1187 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1188
1189 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1190 let asset_source = Arc::new(());
1191 let http_client = http_client::FakeHttpClient::with_404_response();
1192
1193 let app = App::new_app(platform, asset_source, http_client);
1194 let app_weak = Rc::downgrade(&app);
1195
1196 let task = foreground_executor.spawn_with_app(Weak::clone(&app_weak), async move { 42 });
1197
1198 drop(app);
1199
1200 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1201
1202 dispatcher.run_until_parked();
1203
1204 background_executor.block(task);
1205 }
1206
1207 #[test]
1208 fn test_polling_cancelled_task_returns_none_with_fallible() {
1209 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1210 let arc_dispatcher = Arc::new(dispatcher.clone());
1211 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1212 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1213
1214 let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1215 let asset_source = Arc::new(());
1216 let http_client = http_client::FakeHttpClient::with_404_response();
1217
1218 let app = App::new_app(platform, asset_source, http_client);
1219 let app_weak = Rc::downgrade(&app);
1220
1221 let task = foreground_executor
1222 .spawn_with_app(Weak::clone(&app_weak), async move { 42 })
1223 .fallible();
1224
1225 drop(app);
1226
1227 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1228
1229 dispatcher.run_until_parked();
1230
1231 let result = background_executor.block(task);
1232 assert_eq!(result, None, "Cancelled task should return None");
1233 }
1234}