1use crate::{App, GpuiRunnable, PlatformDispatcher, RunnableMeta, TaskTiming, profiler};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use parking_lot::{Condvar, Mutex};
5use smol::prelude::*;
6use std::{
7 cell::Cell,
8 fmt::Debug,
9 marker::PhantomData,
10 mem::{self, ManuallyDrop},
11 num::NonZeroUsize,
12 panic::Location,
13 pin::Pin,
14 rc::Rc,
15 sync::{
16 Arc,
17 atomic::{AtomicUsize, Ordering},
18 },
19 task::{Context, Poll},
20 thread::{self, ThreadId},
21 time::{Duration, Instant},
22};
23use util::TryFutureExt as _;
24use waker_fn::waker_fn;
25
26#[cfg(any(test, feature = "test-support"))]
27use rand::rngs::StdRng;
28
29/// A pointer to the executor that is currently running,
30/// for spawning background tasks.
31#[derive(Clone)]
32pub struct BackgroundExecutor {
33 #[doc(hidden)]
34 pub dispatcher: Arc<dyn PlatformDispatcher>,
35}
36
37/// A pointer to the executor that is currently running,
38/// for spawning tasks on the main thread.
39///
40/// This is intentionally `!Send` via the `not_send` marker field. This is because
41/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
42/// only polled from the same thread it was spawned from. These checks would fail when spawning
43/// foreground tasks from background threads.
44#[derive(Clone)]
45pub struct ForegroundExecutor {
46 #[doc(hidden)]
47 pub dispatcher: Arc<dyn PlatformDispatcher>,
48 liveness: std::sync::Weak<()>,
49 not_send: PhantomData<Rc<()>>,
50}
51
52/// Realtime task priority
53#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
54#[repr(u8)]
55pub enum RealtimePriority {
56 /// Audio task
57 Audio,
58 /// Other realtime task
59 #[default]
60 Other,
61}
62
63/// Task priority
64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
65#[repr(u8)]
66pub enum Priority {
67 /// Realtime priority
68 ///
69 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
70 Realtime(RealtimePriority),
71 /// High priority
72 ///
73 /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
74 High,
75 /// Medium priority, probably suits most of your use cases.
76 #[default]
77 Medium,
78 /// Low priority
79 ///
80 /// Prioritize this for background work that can come in large quantities
81 /// to not starve the executor of resources for high priority tasks
82 Low,
83}
84
85thread_local! {
86static CURRENT_TASKS_PRIORITY: Cell<Priority> = const { Cell::new(Priority::Medium) }; }
87
88impl Priority {
89 /// Sets the priority any spawn call from the runnable about
90 /// to be run will use
91 pub(crate) fn set_as_default_for_spawns(&self) {
92 CURRENT_TASKS_PRIORITY.set(*self);
93 }
94
95 /// Returns the priority from the currently running task
96 pub fn inherit() -> Self {
97 CURRENT_TASKS_PRIORITY.get()
98 }
99
100 #[allow(dead_code)]
101 pub(crate) const fn probability(&self) -> u32 {
102 match self {
103 // realtime priorities are not considered for probability scheduling
104 Priority::Realtime(_) => 0,
105 Priority::High => 60,
106 Priority::Medium => 30,
107 Priority::Low => 10,
108 }
109 }
110}
111
112/// Task is a primitive that allows work to happen in the background.
113///
114/// It implements [`Future`] so you can `.await` on it.
115///
116/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
117/// the task to continue running, but with no way to return a value.
118#[must_use]
119#[derive(Debug)]
120pub struct Task<T>(TaskState<T>);
121
122#[derive(Debug)]
123enum TaskState<T> {
124 /// A task that is ready to return a value
125 Ready(Option<T>),
126
127 /// A task that is currently running.
128 Spawned(async_task::Task<T, RunnableMeta>),
129}
130
131impl<T> Task<T> {
132 /// Creates a new task that will resolve with the value
133 pub fn ready(val: T) -> Self {
134 Task(TaskState::Ready(Some(val)))
135 }
136
137 /// Detaching a task runs it to completion in the background
138 pub fn detach(self) {
139 match self {
140 Task(TaskState::Ready(_)) => {}
141 Task(TaskState::Spawned(task)) => task.detach(),
142 }
143 }
144
145 /// Converts this task into a fallible task that returns `Option<T>`.
146 ///
147 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
148 /// if the app was dropped while the task is executing.
149 ///
150 /// # Example
151 ///
152 /// ```ignore
153 /// // Background task that gracefully handles app shutdown:
154 /// cx.background_spawn(async move {
155 /// let result = foreground_task.fallible().await;
156 /// if let Some(value) = result {
157 /// // Process the value
158 /// }
159 /// // If None, app was shut down - just exit gracefully
160 /// }).detach();
161 /// ```
162 pub fn fallible(self) -> FallibleTask<T> {
163 FallibleTask(match self.0 {
164 TaskState::Ready(val) => FallibleTaskState::Ready(val),
165 TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
166 })
167 }
168}
169
170impl<E, T> Task<Result<T, E>>
171where
172 T: 'static,
173 E: 'static + Debug,
174{
175 /// Run the task to completion in the background and log any
176 /// errors that occur.
177 #[track_caller]
178 pub fn detach_and_log_err(self, cx: &App) {
179 let location = core::panic::Location::caller();
180 cx.foreground_executor()
181 .spawn(self.log_tracked_err(*location))
182 .detach();
183 }
184}
185
186impl<T> Future for Task<T> {
187 type Output = T;
188
189 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
190 match unsafe { self.get_unchecked_mut() } {
191 Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
192 Task(TaskState::Spawned(task)) => task.poll(cx),
193 }
194 }
195}
196
197/// A task that returns `Option<T>` instead of panicking when cancelled.
198#[must_use]
199pub struct FallibleTask<T>(FallibleTaskState<T>);
200
201enum FallibleTaskState<T> {
202 /// A task that is ready to return a value
203 Ready(Option<T>),
204
205 /// A task that is currently running (wraps async_task::FallibleTask).
206 Spawned(async_task::FallibleTask<T, RunnableMeta>),
207}
208
209impl<T> FallibleTask<T> {
210 /// Creates a new fallible task that will resolve with the value.
211 pub fn ready(val: T) -> Self {
212 FallibleTask(FallibleTaskState::Ready(Some(val)))
213 }
214
215 /// Detaching a task runs it to completion in the background.
216 pub fn detach(self) {
217 match self.0 {
218 FallibleTaskState::Ready(_) => {}
219 FallibleTaskState::Spawned(task) => task.detach(),
220 }
221 }
222}
223
224impl<T> Future for FallibleTask<T> {
225 type Output = Option<T>;
226
227 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
228 match unsafe { self.get_unchecked_mut() } {
229 FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
230 FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
231 }
232 }
233}
234
235impl<T> std::fmt::Debug for FallibleTask<T> {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 match &self.0 {
238 FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
239 FallibleTaskState::Spawned(task) => {
240 f.debug_tuple("FallibleTask::Spawned").field(task).finish()
241 }
242 }
243 }
244}
245
246/// A task label is an opaque identifier that you can use to
247/// refer to a task in tests.
248#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
249pub struct TaskLabel(NonZeroUsize);
250
251impl Default for TaskLabel {
252 fn default() -> Self {
253 Self::new()
254 }
255}
256
257impl TaskLabel {
258 /// Construct a new task label.
259 pub fn new() -> Self {
260 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
261 Self(
262 NEXT_TASK_LABEL
263 .fetch_add(1, Ordering::SeqCst)
264 .try_into()
265 .unwrap(),
266 )
267 }
268}
269
270type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
271
272type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
273
274/// BackgroundExecutor lets you run things on background threads.
275/// In production this is a thread pool with no ordering guarantees.
276/// In tests this is simulated by running tasks one by one in a deterministic
277/// (but arbitrary) order controlled by the `SEED` environment variable.
278impl BackgroundExecutor {
279 #[doc(hidden)]
280 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
281 Self { dispatcher }
282 }
283
284 /// Enqueues the given future to be run to completion on a background thread.
285 #[track_caller]
286 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
287 where
288 R: Send + 'static,
289 {
290 self.spawn_with_priority(Priority::default(), future)
291 }
292
293 /// Enqueues the given future to be run to completion on a background thread.
294 #[track_caller]
295 pub fn spawn_with_priority<R>(
296 &self,
297 priority: Priority,
298 future: impl Future<Output = R> + Send + 'static,
299 ) -> Task<R>
300 where
301 R: Send + 'static,
302 {
303 self.spawn_internal::<R>(Box::pin(future), None, priority)
304 }
305
306 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
307 ///
308 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
309 /// completion before the current task is resumed, even if the current task is slated for cancellation.
310 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
311 where
312 R: Send,
313 {
314 // We need to ensure that cancellation of the parent task does not drop the environment
315 // before the our own task has completed or got cancelled.
316 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
317
318 impl Drop for NotifyOnDrop<'_> {
319 fn drop(&mut self) {
320 *self.0.1.lock() = true;
321 self.0.0.notify_all();
322 }
323 }
324
325 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
326
327 impl Drop for WaitOnDrop<'_> {
328 fn drop(&mut self) {
329 let mut done = self.0.1.lock();
330 if !*done {
331 self.0.0.wait(&mut done);
332 }
333 }
334 }
335
336 let dispatcher = self.dispatcher.clone();
337 let location = core::panic::Location::caller();
338
339 let pair = &(Condvar::new(), Mutex::new(false));
340 let _wait_guard = WaitOnDrop(pair);
341
342 let (runnable, task) = unsafe {
343 async_task::Builder::new()
344 .metadata(RunnableMeta {
345 location,
346 app: None,
347 priority: Priority::inherit(),
348 })
349 .spawn_unchecked(
350 move |_| async {
351 let _notify_guard = NotifyOnDrop(pair);
352 future.await
353 },
354 move |runnable| dispatcher.dispatch(GpuiRunnable::GpuiSpawned(runnable), None),
355 )
356 };
357 runnable.schedule();
358 task.await
359 }
360
361 /// Enqueues the given future to be run to completion on a background thread.
362 /// The given label can be used to control the priority of the task in tests.
363 #[track_caller]
364 pub fn spawn_labeled<R>(
365 &self,
366 label: TaskLabel,
367 future: impl Future<Output = R> + Send + 'static,
368 ) -> Task<R>
369 where
370 R: Send + 'static,
371 {
372 self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
373 }
374
375 #[track_caller]
376 fn spawn_internal<R: Send + 'static>(
377 &self,
378 future: AnyFuture<R>,
379 label: Option<TaskLabel>,
380 priority: Priority,
381 ) -> Task<R> {
382 let dispatcher = self.dispatcher.clone();
383 let (runnable, task) = if let Priority::Realtime(realtime) = priority {
384 let location = core::panic::Location::caller();
385 let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
386
387 dispatcher.spawn_realtime(
388 realtime,
389 Box::new(move || {
390 while let Ok(runnable) = rx.recv() {
391 let start = Instant::now();
392 let location = runnable.metadata().location;
393 let mut timing = TaskTiming {
394 location,
395 start,
396 end: None,
397 };
398 profiler::add_task_timing(timing);
399
400 Priority::Realtime(realtime).set_as_default_for_spawns();
401 runnable.run();
402
403 let end = Instant::now();
404 timing.end = Some(end);
405 profiler::add_task_timing(timing);
406 }
407 }),
408 );
409
410 async_task::Builder::new()
411 .metadata(RunnableMeta {
412 location,
413 priority,
414 app: None,
415 })
416 .spawn(
417 move |_| future,
418 move |runnable| {
419 let _ = tx.send(runnable);
420 },
421 )
422 } else {
423 let location = core::panic::Location::caller();
424 async_task::Builder::new()
425 .metadata(RunnableMeta {
426 location,
427 priority,
428 app: None,
429 })
430 .spawn(
431 move |_| future,
432 move |runnable| dispatcher.dispatch(GpuiRunnable::GpuiSpawned(runnable), label),
433 )
434 };
435
436 runnable.schedule();
437 Task(TaskState::Spawned(task))
438 }
439
440 /// Used by the test harness to run an async test in a synchronous fashion.
441 #[cfg(any(test, feature = "test-support"))]
442 #[track_caller]
443 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
444 if let Ok(value) = self.block_internal(false, future, None) {
445 value
446 } else {
447 unreachable!()
448 }
449 }
450
451 /// Block the current thread until the given future resolves.
452 /// Consider using `block_with_timeout` instead.
453 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
454 if let Ok(value) = self.block_internal(true, future, None) {
455 value
456 } else {
457 unreachable!()
458 }
459 }
460
461 #[cfg(not(any(test, feature = "test-support")))]
462 pub(crate) fn block_internal<Fut: Future>(
463 &self,
464 _background_only: bool,
465 future: Fut,
466 timeout: Option<Duration>,
467 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
468 use std::time::Instant;
469
470 let mut future = Box::pin(future);
471 if timeout == Some(Duration::ZERO) {
472 return Err(future);
473 }
474 let deadline = timeout.map(|timeout| Instant::now() + timeout);
475
476 let parker = parking::Parker::new();
477 let unparker = parker.unparker();
478 let waker = waker_fn(move || {
479 unparker.unpark();
480 });
481 let mut cx = std::task::Context::from_waker(&waker);
482
483 loop {
484 match future.as_mut().poll(&mut cx) {
485 Poll::Ready(result) => return Ok(result),
486 Poll::Pending => {
487 let timeout =
488 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
489 if let Some(timeout) = timeout {
490 if !parker.park_timeout(timeout)
491 && deadline.is_some_and(|deadline| deadline < Instant::now())
492 {
493 return Err(future);
494 }
495 } else {
496 parker.park();
497 }
498 }
499 }
500 }
501 }
502
503 #[cfg(any(test, feature = "test-support"))]
504 #[track_caller]
505 pub(crate) fn block_internal<Fut: Future>(
506 &self,
507 background_only: bool,
508 future: Fut,
509 timeout: Option<Duration>,
510 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
511 use std::sync::atomic::AtomicBool;
512 use std::time::Instant;
513
514 use parking::Parker;
515
516 let mut future = Box::pin(future);
517 if timeout == Some(Duration::ZERO) {
518 return Err(future);
519 }
520
521 // When using a real platform (e.g., MacPlatform for visual tests that need actual
522 // Metal rendering), there's no test dispatcher. In this case, we block the thread
523 // directly by polling the future and parking until woken. This is required for
524 // VisualTestAppContext which uses real platform rendering but still needs blocking
525 // behavior for code paths like editor initialization that call block_with_timeout.
526 let Some(dispatcher) = self.dispatcher.as_test() else {
527 let deadline = timeout.map(|timeout| Instant::now() + timeout);
528
529 let parker = Parker::new();
530 let unparker = parker.unparker();
531 let waker = waker_fn(move || {
532 unparker.unpark();
533 });
534 let mut cx = std::task::Context::from_waker(&waker);
535
536 loop {
537 match future.as_mut().poll(&mut cx) {
538 Poll::Ready(result) => return Ok(result),
539 Poll::Pending => {
540 let timeout = deadline
541 .map(|deadline| deadline.saturating_duration_since(Instant::now()));
542 if let Some(timeout) = timeout {
543 if !parker.park_timeout(timeout)
544 && deadline.is_some_and(|deadline| deadline < Instant::now())
545 {
546 return Err(future);
547 }
548 } else {
549 parker.park();
550 }
551 }
552 }
553 }
554 };
555
556 let mut max_ticks = if timeout.is_some() {
557 dispatcher.gen_block_on_ticks()
558 } else {
559 usize::MAX
560 };
561
562 let parker = Parker::new();
563 let unparker = parker.unparker();
564
565 let awoken = Arc::new(AtomicBool::new(false));
566 let waker = waker_fn({
567 let awoken = awoken.clone();
568 let unparker = unparker.clone();
569 move || {
570 awoken.store(true, Ordering::SeqCst);
571 unparker.unpark();
572 }
573 });
574 let mut cx = std::task::Context::from_waker(&waker);
575
576 let duration = Duration::from_secs(
577 option_env!("GPUI_TEST_TIMEOUT")
578 .and_then(|s| s.parse::<u64>().ok())
579 .unwrap_or(180),
580 );
581 let mut test_should_end_by = Instant::now() + duration;
582
583 loop {
584 match future.as_mut().poll(&mut cx) {
585 Poll::Ready(result) => return Ok(result),
586 Poll::Pending => {
587 if max_ticks == 0 {
588 return Err(future);
589 }
590 max_ticks -= 1;
591
592 if !dispatcher.tick(background_only) {
593 if awoken.swap(false, Ordering::SeqCst) {
594 continue;
595 }
596
597 if !dispatcher.parking_allowed() {
598 if dispatcher.advance_clock_to_next_delayed() {
599 continue;
600 }
601 let mut backtrace_message = String::new();
602 let mut waiting_message = String::new();
603 if let Some(backtrace) = dispatcher.waiting_backtrace() {
604 backtrace_message =
605 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
606 }
607 if let Some(waiting_hint) = dispatcher.waiting_hint() {
608 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
609 }
610 panic!(
611 "parked with nothing left to run{waiting_message}{backtrace_message}",
612 )
613 }
614 dispatcher.push_unparker(unparker.clone());
615 parker.park_timeout(Duration::from_millis(1));
616 if Instant::now() > test_should_end_by {
617 panic!("test timed out after {duration:?} with allow_parking")
618 }
619 }
620 }
621 }
622 }
623 }
624
625 /// Block the current thread until the given future resolves
626 /// or `duration` has elapsed.
627 pub fn block_with_timeout<Fut: Future>(
628 &self,
629 duration: Duration,
630 future: Fut,
631 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
632 self.block_internal(true, future, Some(duration))
633 }
634
635 /// Scoped lets you start a number of tasks and waits
636 /// for all of them to complete before returning.
637 pub async fn scoped<'scope, F>(&self, scheduler: F)
638 where
639 F: FnOnce(&mut Scope<'scope>),
640 {
641 let mut scope = Scope::new(self.clone(), Priority::default());
642 (scheduler)(&mut scope);
643 let spawned = mem::take(&mut scope.futures)
644 .into_iter()
645 .map(|f| self.spawn_with_priority(scope.priority, f))
646 .collect::<Vec<_>>();
647 for task in spawned {
648 task.await;
649 }
650 }
651
652 /// Scoped lets you start a number of tasks and waits
653 /// for all of them to complete before returning.
654 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
655 where
656 F: FnOnce(&mut Scope<'scope>),
657 {
658 let mut scope = Scope::new(self.clone(), priority);
659 (scheduler)(&mut scope);
660 let spawned = mem::take(&mut scope.futures)
661 .into_iter()
662 .map(|f| self.spawn_with_priority(scope.priority, f))
663 .collect::<Vec<_>>();
664 for task in spawned {
665 task.await;
666 }
667 }
668
669 /// Get the current time.
670 ///
671 /// Calling this instead of `std::time::Instant::now` allows the use
672 /// of fake timers in tests.
673 pub fn now(&self) -> Instant {
674 self.dispatcher.now()
675 }
676
677 /// Returns a task that will complete after the given duration.
678 /// Depending on other concurrent tasks the elapsed duration may be longer
679 /// than requested.
680 pub fn timer(&self, duration: Duration) -> Task<()> {
681 if duration.is_zero() {
682 return Task::ready(());
683 }
684 let location = core::panic::Location::caller();
685 let (runnable, task) = async_task::Builder::new()
686 .metadata(RunnableMeta {
687 location,
688 priority: Priority::inherit(),
689 app: None,
690 })
691 .spawn(move |_| async move {}, {
692 let dispatcher = self.dispatcher.clone();
693 move |runnable| {
694 dispatcher.dispatch_after(duration, GpuiRunnable::GpuiSpawned(runnable))
695 }
696 });
697 runnable.schedule();
698 Task(TaskState::Spawned(task))
699 }
700
701 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
702 #[cfg(any(test, feature = "test-support"))]
703 pub fn start_waiting(&self) {
704 self.dispatcher.as_test().unwrap().start_waiting();
705 }
706
707 /// in tests, removes the debugging data added by start_waiting
708 #[cfg(any(test, feature = "test-support"))]
709 pub fn finish_waiting(&self) {
710 self.dispatcher.as_test().unwrap().finish_waiting();
711 }
712
713 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
714 #[cfg(any(test, feature = "test-support"))]
715 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
716 self.dispatcher.as_test().unwrap().simulate_random_delay()
717 }
718
719 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
720 #[cfg(any(test, feature = "test-support"))]
721 pub fn deprioritize(&self, task_label: TaskLabel) {
722 self.dispatcher.as_test().unwrap().deprioritize(task_label)
723 }
724
725 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
726 #[cfg(any(test, feature = "test-support"))]
727 pub fn advance_clock(&self, duration: Duration) {
728 self.dispatcher.as_test().unwrap().advance_clock(duration)
729 }
730
731 /// in tests, run one task.
732 #[cfg(any(test, feature = "test-support"))]
733 pub fn tick(&self) -> bool {
734 self.dispatcher.as_test().unwrap().tick(false)
735 }
736
737 /// in tests, run all tasks that are ready to run. If after doing so
738 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
739 #[cfg(any(test, feature = "test-support"))]
740 pub fn run_until_parked(&self) {
741 self.dispatcher.as_test().unwrap().run_until_parked()
742 }
743
744 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
745 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
746 /// do take real async time to run.
747 #[cfg(any(test, feature = "test-support"))]
748 pub fn allow_parking(&self) {
749 self.dispatcher.as_test().unwrap().allow_parking();
750 }
751
752 /// undoes the effect of [`Self::allow_parking`].
753 #[cfg(any(test, feature = "test-support"))]
754 pub fn forbid_parking(&self) {
755 self.dispatcher.as_test().unwrap().forbid_parking();
756 }
757
758 /// adds detail to the "parked with nothing let to run" message.
759 #[cfg(any(test, feature = "test-support"))]
760 pub fn set_waiting_hint(&self, msg: Option<String>) {
761 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
762 }
763
764 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
765 #[cfg(any(test, feature = "test-support"))]
766 pub fn rng(&self) -> StdRng {
767 self.dispatcher.as_test().unwrap().rng()
768 }
769
770 /// How many CPUs are available to the dispatcher.
771 pub fn num_cpus(&self) -> usize {
772 #[cfg(any(test, feature = "test-support"))]
773 return 4;
774
775 #[cfg(not(any(test, feature = "test-support")))]
776 return num_cpus::get();
777 }
778
779 /// Whether we're on the main thread.
780 pub fn is_main_thread(&self) -> bool {
781 self.dispatcher.is_main_thread()
782 }
783
784 #[cfg(any(test, feature = "test-support"))]
785 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
786 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
787 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
788 }
789}
790
791/// ForegroundExecutor runs things on the main thread.
792impl ForegroundExecutor {
793 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
794 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>, liveness: std::sync::Weak<()>) -> Self {
795 Self {
796 dispatcher,
797 liveness,
798 not_send: PhantomData,
799 }
800 }
801
802 /// Enqueues the given Task to run on the main thread at some point in the
803 /// future. This inherits the priority of the caller. Use
804 /// [`spawn_with_priority`](Self::spawn_with_priority) if you want to
805 /// overwrite that.
806 #[track_caller]
807 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
808 where
809 R: 'static,
810 {
811 self.inner_spawn(self.liveness.clone(), Priority::default(), future)
812 }
813
814 /// Enqueues the given Task to run on the main thread at some point in the future.
815 #[track_caller]
816 pub fn spawn_with_priority<R>(
817 &self,
818 priority: Priority,
819 future: impl Future<Output = R> + 'static,
820 ) -> Task<R>
821 where
822 R: 'static,
823 {
824 self.inner_spawn(self.liveness.clone(), priority, future)
825 }
826
827 #[track_caller]
828 pub(crate) fn inner_spawn<R>(
829 &self,
830 app: std::sync::Weak<()>,
831 priority: Priority,
832 future: impl Future<Output = R> + 'static,
833 ) -> Task<R>
834 where
835 R: 'static,
836 {
837 let dispatcher = self.dispatcher.clone();
838 let location = core::panic::Location::caller();
839
840 #[track_caller]
841 fn inner<R: 'static>(
842 dispatcher: Arc<dyn PlatformDispatcher>,
843 future: AnyLocalFuture<R>,
844 location: &'static core::panic::Location<'static>,
845 app: std::sync::Weak<()>,
846 priority: Priority,
847 ) -> Task<R> {
848 let (runnable, task) = spawn_local_with_source_location(
849 future,
850 move |runnable| {
851 dispatcher.dispatch_on_main_thread(GpuiRunnable::GpuiSpawned(runnable))
852 },
853 RunnableMeta {
854 location,
855 priority,
856 app: Some(app),
857 },
858 );
859 runnable.schedule();
860 Task(TaskState::Spawned(task))
861 }
862 inner::<R>(dispatcher, Box::pin(future), location, app, priority)
863 }
864}
865
866/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
867///
868/// Copy-modified from:
869/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
870#[track_caller]
871fn spawn_local_with_source_location<Fut, S, M>(
872 future: Fut,
873 schedule: S,
874 metadata: M,
875) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
876where
877 Fut: Future + 'static,
878 Fut::Output: 'static,
879 S: async_task::Schedule<M> + Send + Sync + 'static,
880 M: 'static,
881{
882 #[inline]
883 fn thread_id() -> ThreadId {
884 std::thread_local! {
885 static ID: ThreadId = thread::current().id();
886 }
887 ID.try_with(|id| *id)
888 .unwrap_or_else(|_| thread::current().id())
889 }
890
891 struct Checked<F> {
892 id: ThreadId,
893 inner: ManuallyDrop<F>,
894 location: &'static Location<'static>,
895 }
896
897 impl<F> Drop for Checked<F> {
898 fn drop(&mut self) {
899 assert!(
900 self.id == thread_id(),
901 "local task dropped by a thread that didn't spawn it. Task spawned at {}",
902 self.location
903 );
904 unsafe { ManuallyDrop::drop(&mut self.inner) };
905 }
906 }
907
908 impl<F: Future> Future for Checked<F> {
909 type Output = F::Output;
910
911 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
912 assert!(
913 self.id == thread_id(),
914 "local task polled by a thread that didn't spawn it. Task spawned at {}",
915 self.location
916 );
917 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
918 }
919 }
920
921 // Wrap the future into one that checks which thread it's on.
922 let future = Checked {
923 id: thread_id(),
924 inner: ManuallyDrop::new(future),
925 location: Location::caller(),
926 };
927
928 unsafe {
929 async_task::Builder::new()
930 .metadata(metadata)
931 .spawn_unchecked(move |_| future, schedule)
932 }
933}
934
935/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
936pub struct Scope<'a> {
937 executor: BackgroundExecutor,
938 priority: Priority,
939 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
940 tx: Option<mpsc::Sender<()>>,
941 rx: mpsc::Receiver<()>,
942 lifetime: PhantomData<&'a ()>,
943}
944
945impl<'a> Scope<'a> {
946 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
947 let (tx, rx) = mpsc::channel(1);
948 Self {
949 executor,
950 priority,
951 tx: Some(tx),
952 rx,
953 futures: Default::default(),
954 lifetime: PhantomData,
955 }
956 }
957
958 /// How many CPUs are available to the dispatcher.
959 pub fn num_cpus(&self) -> usize {
960 self.executor.num_cpus()
961 }
962
963 /// Spawn a future into this scope.
964 #[track_caller]
965 pub fn spawn<F>(&mut self, f: F)
966 where
967 F: Future<Output = ()> + Send + 'a,
968 {
969 let tx = self.tx.clone().unwrap();
970
971 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
972 // dropping this `Scope` blocks until all of the futures have resolved.
973 let f = unsafe {
974 mem::transmute::<
975 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
976 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
977 >(Box::pin(async move {
978 f.await;
979 drop(tx);
980 }))
981 };
982 self.futures.push(f);
983 }
984}
985
986impl Drop for Scope<'_> {
987 fn drop(&mut self) {
988 self.tx.take().unwrap();
989
990 // Wait until the channel is closed, which means that all of the spawned
991 // futures have resolved.
992 self.executor.block(self.rx.next());
993 }
994}
995
996#[cfg(test)]
997mod test {
998 use super::*;
999 use crate::{App, TestDispatcher, TestPlatform};
1000 use rand::SeedableRng;
1001 use std::cell::RefCell;
1002
1003 /// Helper to create test infrastructure.
1004 /// Returns (dispatcher, background_executor, app) where app's foreground_executor has liveness.
1005 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
1006 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1007 let arc_dispatcher = Arc::new(dispatcher.clone());
1008 // Create liveness for task cancellation
1009 let liveness = std::sync::Arc::new(());
1010 let liveness_weak = std::sync::Arc::downgrade(&liveness);
1011 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1012 let foreground_executor = ForegroundExecutor::new(arc_dispatcher, liveness_weak);
1013
1014 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
1015 let asset_source = Arc::new(());
1016 let http_client = http_client::FakeHttpClient::with_404_response();
1017
1018 let app = App::new_app(platform, liveness, asset_source, http_client);
1019 (dispatcher, background_executor, app)
1020 }
1021
1022 #[test]
1023 fn sanity_test_tasks_run() {
1024 let (dispatcher, _background_executor, app) = create_test_app();
1025 let foreground_executor = app.borrow().foreground_executor.clone();
1026
1027 let task_ran = Rc::new(RefCell::new(false));
1028
1029 foreground_executor
1030 .spawn({
1031 let task_ran = Rc::clone(&task_ran);
1032 async move {
1033 *task_ran.borrow_mut() = true;
1034 }
1035 })
1036 .detach();
1037
1038 // Run dispatcher while app is still alive
1039 dispatcher.run_until_parked();
1040
1041 // Task should have run
1042 assert!(
1043 *task_ran.borrow(),
1044 "Task should run normally when app is alive"
1045 );
1046 }
1047
1048 #[test]
1049 fn test_task_cancelled_when_app_dropped() {
1050 let (dispatcher, _background_executor, app) = create_test_app();
1051 let foreground_executor = app.borrow().foreground_executor.clone();
1052 let app_weak = Rc::downgrade(&app);
1053
1054 let task_ran = Rc::new(RefCell::new(false));
1055 let task_ran_clone = Rc::clone(&task_ran);
1056
1057 foreground_executor
1058 .spawn(async move {
1059 *task_ran_clone.borrow_mut() = true;
1060 })
1061 .detach();
1062
1063 drop(app);
1064
1065 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1066
1067 dispatcher.run_until_parked();
1068
1069 // The task should have been cancelled, not run
1070 assert!(
1071 !*task_ran.borrow(),
1072 "Task should have been cancelled when app was dropped, but it ran!"
1073 );
1074 }
1075
1076 #[test]
1077 fn test_nested_tasks_both_cancel() {
1078 let (dispatcher, _background_executor, app) = create_test_app();
1079 let foreground_executor = app.borrow().foreground_executor.clone();
1080 let app_weak = Rc::downgrade(&app);
1081
1082 let outer_completed = Rc::new(RefCell::new(false));
1083 let inner_completed = Rc::new(RefCell::new(false));
1084 let reached_await = Rc::new(RefCell::new(false));
1085
1086 let outer_flag = Rc::clone(&outer_completed);
1087 let inner_flag = Rc::clone(&inner_completed);
1088 let await_flag = Rc::clone(&reached_await);
1089
1090 // Channel to block the inner task until we're ready
1091 let (tx, rx) = futures::channel::oneshot::channel::<()>();
1092
1093 let inner_executor = foreground_executor.clone();
1094
1095 foreground_executor
1096 .spawn(async move {
1097 let inner_task = inner_executor.spawn({
1098 let inner_flag = Rc::clone(&inner_flag);
1099 async move {
1100 rx.await.ok();
1101 *inner_flag.borrow_mut() = true;
1102 }
1103 });
1104
1105 *await_flag.borrow_mut() = true;
1106
1107 inner_task.await;
1108
1109 *outer_flag.borrow_mut() = true;
1110 })
1111 .detach();
1112
1113 // Run dispatcher until outer task reaches the await point
1114 // The inner task will be blocked on the channel
1115 dispatcher.run_until_parked();
1116
1117 // Verify we actually reached the await point before dropping the app
1118 assert!(
1119 *reached_await.borrow(),
1120 "Outer task should have reached the await point"
1121 );
1122
1123 // Neither task should have completed yet
1124 assert!(
1125 !*outer_completed.borrow(),
1126 "Outer task should not have completed yet"
1127 );
1128 assert!(
1129 !*inner_completed.borrow(),
1130 "Inner task should not have completed yet"
1131 );
1132
1133 // Drop the channel sender and app while outer is awaiting inner
1134 drop(tx);
1135 drop(app);
1136 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1137
1138 // Run dispatcher - both tasks should be cancelled
1139 dispatcher.run_until_parked();
1140
1141 // Neither task should have completed (both were cancelled)
1142 assert!(
1143 !*outer_completed.borrow(),
1144 "Outer task should have been cancelled, not completed"
1145 );
1146 assert!(
1147 !*inner_completed.borrow(),
1148 "Inner task should have been cancelled, not completed"
1149 );
1150 }
1151
1152 #[test]
1153 #[should_panic]
1154 fn test_polling_cancelled_task_panics() {
1155 let (dispatcher, background_executor, app) = create_test_app();
1156 let foreground_executor = app.borrow().foreground_executor.clone();
1157 let app_weak = Rc::downgrade(&app);
1158
1159 let task = foreground_executor.spawn(async move { 42 });
1160
1161 drop(app);
1162
1163 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1164
1165 dispatcher.run_until_parked();
1166
1167 background_executor.block(task);
1168 }
1169
1170 #[test]
1171 fn test_polling_cancelled_task_returns_none_with_fallible() {
1172 let (dispatcher, background_executor, app) = create_test_app();
1173 let foreground_executor = app.borrow().foreground_executor.clone();
1174 let app_weak = Rc::downgrade(&app);
1175
1176 let task = foreground_executor.spawn(async move { 42 }).fallible();
1177
1178 drop(app);
1179
1180 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1181
1182 dispatcher.run_until_parked();
1183
1184 let result = background_executor.block(task);
1185 assert_eq!(result, None, "Cancelled task should return None");
1186 }
1187}