executor.rs

  1use crate::{App, PlatformDispatcher, PlatformScheduler};
  2use futures::channel::mpsc;
  3use futures::prelude::*;
  4use gpui_util::TryFutureExt;
  5use scheduler::Instant;
  6use scheduler::Scheduler;
  7use std::{
  8    fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
  9    time::Duration,
 10};
 11
 12pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
 13
 14/// A pointer to the executor that is currently running,
 15/// for spawning background tasks.
 16#[derive(Clone)]
 17pub struct BackgroundExecutor {
 18    inner: scheduler::BackgroundExecutor,
 19    dispatcher: Arc<dyn PlatformDispatcher>,
 20}
 21
 22/// A pointer to the executor that is currently running,
 23/// for spawning tasks on the main thread.
 24#[derive(Clone)]
 25pub struct ForegroundExecutor {
 26    inner: scheduler::ForegroundExecutor,
 27    dispatcher: Arc<dyn PlatformDispatcher>,
 28    not_send: PhantomData<Rc<()>>,
 29}
 30
 31/// Task is a primitive that allows work to happen in the background.
 32///
 33/// It implements [`Future`] so you can `.await` on it.
 34///
 35/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 36/// the task to continue running, but with no way to return a value.
 37#[must_use]
 38#[derive(Debug)]
 39pub struct Task<T>(scheduler::Task<T>);
 40
 41impl<T> Task<T> {
 42    /// Creates a new task that will resolve with the value.
 43    pub fn ready(val: T) -> Self {
 44        Task(scheduler::Task::ready(val))
 45    }
 46
 47    /// Returns true if the task has completed or was created with `Task::ready`.
 48    pub fn is_ready(&self) -> bool {
 49        self.0.is_ready()
 50    }
 51
 52    /// Detaching a task runs it to completion in the background.
 53    pub fn detach(self) {
 54        self.0.detach()
 55    }
 56
 57    /// Wraps a scheduler::Task.
 58    pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
 59        Task(task)
 60    }
 61
 62    /// Converts this task into a fallible task that returns `Option<T>`.
 63    ///
 64    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 65    /// if the task was cancelled.
 66    ///
 67    /// # Example
 68    ///
 69    /// ```ignore
 70    /// // Background task that gracefully handles cancellation:
 71    /// cx.background_spawn(async move {
 72    ///     let result = foreground_task.fallible().await;
 73    ///     if let Some(value) = result {
 74    ///         // Process the value
 75    ///     }
 76    ///     // If None, task was cancelled - just exit gracefully
 77    /// }).detach();
 78    /// ```
 79    pub fn fallible(self) -> FallibleTask<T> {
 80        self.0.fallible()
 81    }
 82}
 83
 84impl<T, E> Task<Result<T, E>>
 85where
 86    T: 'static,
 87    E: 'static + Debug,
 88{
 89    /// Run the task to completion in the background and log any errors that occur.
 90    #[track_caller]
 91    pub fn detach_and_log_err(self, cx: &App) {
 92        let location = core::panic::Location::caller();
 93        cx.foreground_executor()
 94            .spawn(self.log_tracked_err(*location))
 95            .detach();
 96    }
 97}
 98
 99impl<T> std::future::Future for Task<T> {
100    type Output = T;
101
102    fn poll(
103        self: std::pin::Pin<&mut Self>,
104        cx: &mut std::task::Context<'_>,
105    ) -> std::task::Poll<Self::Output> {
106        // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
107        // and we're just projecting the pin through to the inner task.
108        let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
109        inner.poll(cx)
110    }
111}
112
113impl BackgroundExecutor {
114    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
115    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
116        #[cfg(any(test, feature = "test-support"))]
117        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
118            test_dispatcher.scheduler().clone()
119        } else {
120            Arc::new(PlatformScheduler::new(dispatcher.clone()))
121        };
122
123        #[cfg(not(any(test, feature = "test-support")))]
124        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
125
126        Self {
127            inner: scheduler::BackgroundExecutor::new(scheduler),
128            dispatcher,
129        }
130    }
131
132    /// Close this executor. Tasks will not run after this is called.
133    pub fn close(&self) {
134        self.inner.close();
135    }
136
137    /// Enqueues the given future to be run to completion on a background thread.
138    #[track_caller]
139    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
140    where
141        R: Send + 'static,
142    {
143        self.spawn_with_priority(Priority::default(), future.boxed())
144    }
145
146    /// Enqueues the given future to be run to completion on a background thread with the given priority.
147    ///
148    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
149    /// realtime scheduling priority, suitable for audio processing.
150    #[track_caller]
151    pub fn spawn_with_priority<R>(
152        &self,
153        priority: Priority,
154        future: impl Future<Output = R> + Send + 'static,
155    ) -> Task<R>
156    where
157        R: Send + 'static,
158    {
159        if priority == Priority::RealtimeAudio {
160            Task::from_scheduler(self.inner.spawn_realtime(future))
161        } else {
162            Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
163        }
164    }
165
166    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
167    ///
168    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
169    /// completion before the current task is resumed, even if the current task is slated for cancellation.
170    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
171    where
172        R: Send,
173    {
174        use crate::RunnableMeta;
175        use parking_lot::{Condvar, Mutex};
176        use std::sync::{Arc, atomic::AtomicBool};
177
178        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
179
180        impl Drop for NotifyOnDrop<'_> {
181            fn drop(&mut self) {
182                *self.0.1.lock() = true;
183                self.0.0.notify_all();
184            }
185        }
186
187        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
188
189        impl Drop for WaitOnDrop<'_> {
190            fn drop(&mut self) {
191                let mut done = self.0.1.lock();
192                if !*done {
193                    self.0.0.wait(&mut done);
194                }
195            }
196        }
197
198        let dispatcher = self.dispatcher.clone();
199        let location = core::panic::Location::caller();
200        let closed = Arc::new(AtomicBool::new(false));
201
202        let pair = &(Condvar::new(), Mutex::new(false));
203        let _wait_guard = WaitOnDrop(pair);
204
205        let (runnable, task) = unsafe {
206            async_task::Builder::new()
207                .metadata(RunnableMeta { location, closed })
208                .spawn_unchecked(
209                    move |_| async {
210                        let _notify_guard = NotifyOnDrop(pair);
211                        future.await
212                    },
213                    move |runnable| {
214                        dispatcher.dispatch(runnable, Priority::default());
215                    },
216                )
217        };
218        runnable.schedule();
219        task.await
220    }
221
222    /// Scoped lets you start a number of tasks and waits
223    /// for all of them to complete before returning.
224    pub async fn scoped<'scope, F>(&self, scheduler: F)
225    where
226        F: FnOnce(&mut Scope<'scope>),
227    {
228        let mut scope = Scope::new(self.clone(), Priority::default());
229        (scheduler)(&mut scope);
230        let spawned = mem::take(&mut scope.futures)
231            .into_iter()
232            .map(|f| self.spawn_with_priority(scope.priority, f))
233            .collect::<Vec<_>>();
234        for task in spawned {
235            task.await;
236        }
237    }
238
239    /// Scoped lets you start a number of tasks and waits
240    /// for all of them to complete before returning.
241    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
242    where
243        F: FnOnce(&mut Scope<'scope>),
244    {
245        let mut scope = Scope::new(self.clone(), priority);
246        (scheduler)(&mut scope);
247        let spawned = mem::take(&mut scope.futures)
248            .into_iter()
249            .map(|f| self.spawn_with_priority(scope.priority, f))
250            .collect::<Vec<_>>();
251        for task in spawned {
252            task.await;
253        }
254    }
255
256    /// Get the current time.
257    ///
258    /// Calling this instead of `std::time::Instant::now` allows the use
259    /// of fake timers in tests.
260    pub fn now(&self) -> Instant {
261        self.inner.scheduler().clock().now()
262    }
263
264    /// Returns a task that will complete after the given duration.
265    /// Depending on other concurrent tasks the elapsed duration may be longer
266    /// than requested.
267    #[track_caller]
268    pub fn timer(&self, duration: Duration) -> Task<()> {
269        if duration.is_zero() {
270            return Task::ready(());
271        }
272        self.spawn(self.inner.scheduler().timer(duration))
273    }
274
275    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
276    #[cfg(any(test, feature = "test-support"))]
277    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
278        self.dispatcher.as_test().unwrap().simulate_random_delay()
279    }
280
281    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
282    #[cfg(any(test, feature = "test-support"))]
283    pub fn advance_clock(&self, duration: Duration) {
284        self.dispatcher.as_test().unwrap().advance_clock(duration)
285    }
286
287    /// In tests, run one task.
288    #[cfg(any(test, feature = "test-support"))]
289    pub fn tick(&self) -> bool {
290        self.dispatcher.as_test().unwrap().scheduler().tick()
291    }
292
293    /// In tests, run tasks until the scheduler would park.
294    ///
295    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
296    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
297    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
298    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
299    #[cfg(any(test, feature = "test-support"))]
300    pub fn run_until_parked(&self) {
301        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
302        scheduler.run();
303    }
304
305    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
306    #[cfg(any(test, feature = "test-support"))]
307    pub fn allow_parking(&self) {
308        self.dispatcher
309            .as_test()
310            .unwrap()
311            .scheduler()
312            .allow_parking();
313
314        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
315            log::warn!("[gpui::executor] allow_parking: enabled");
316        }
317    }
318
319    /// Sets the range of ticks to run before timing out in block_on.
320    #[cfg(any(test, feature = "test-support"))]
321    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
322        self.dispatcher
323            .as_test()
324            .unwrap()
325            .scheduler()
326            .set_timeout_ticks(range);
327    }
328
329    /// Undoes the effect of [`Self::allow_parking`].
330    #[cfg(any(test, feature = "test-support"))]
331    pub fn forbid_parking(&self) {
332        self.dispatcher
333            .as_test()
334            .unwrap()
335            .scheduler()
336            .forbid_parking();
337    }
338
339    /// In tests, returns the rng used by the dispatcher.
340    #[cfg(any(test, feature = "test-support"))]
341    pub fn rng(&self) -> scheduler::SharedRng {
342        self.dispatcher.as_test().unwrap().scheduler().rng()
343    }
344
345    /// How many CPUs are available to the dispatcher.
346    pub fn num_cpus(&self) -> usize {
347        #[cfg(any(test, feature = "test-support"))]
348        if let Some(test) = self.dispatcher.as_test() {
349            return test.num_cpus_override().unwrap_or(4);
350        }
351        num_cpus::get()
352    }
353
354    /// Override the number of CPUs reported by this executor in tests.
355    /// Panics if not called on a test executor.
356    #[cfg(any(test, feature = "test-support"))]
357    pub fn set_num_cpus(&self, count: usize) {
358        self.dispatcher
359            .as_test()
360            .expect("set_num_cpus can only be called on a test executor")
361            .set_num_cpus(count);
362    }
363
364    /// Whether we're on the main thread.
365    pub fn is_main_thread(&self) -> bool {
366        self.dispatcher.is_main_thread()
367    }
368
369    #[doc(hidden)]
370    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
371        &self.dispatcher
372    }
373}
374
375impl ForegroundExecutor {
376    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
377    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
378        #[cfg(any(test, feature = "test-support"))]
379        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
380            if let Some(test_dispatcher) = dispatcher.as_test() {
381                (
382                    test_dispatcher.scheduler().clone(),
383                    test_dispatcher.session_id(),
384                )
385            } else {
386                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
387                let session_id = platform_scheduler.allocate_session_id();
388                (platform_scheduler, session_id)
389            };
390
391        #[cfg(not(any(test, feature = "test-support")))]
392        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
393            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
394            let session_id = platform_scheduler.allocate_session_id();
395            (platform_scheduler, session_id)
396        };
397
398        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
399
400        Self {
401            inner,
402            dispatcher,
403            not_send: PhantomData,
404        }
405    }
406
407    /// Close this executor. Tasks will not run after this is called.
408    pub fn close(&self) {
409        self.inner.close();
410    }
411
412    /// Enqueues the given Task to run on the main thread.
413    #[track_caller]
414    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
415    where
416        R: 'static,
417    {
418        Task::from_scheduler(self.inner.spawn(future.boxed_local()))
419    }
420
421    /// Enqueues the given Task to run on the main thread with the given priority.
422    #[track_caller]
423    pub fn spawn_with_priority<R>(
424        &self,
425        _priority: Priority,
426        future: impl Future<Output = R> + 'static,
427    ) -> Task<R>
428    where
429        R: 'static,
430    {
431        // Priority is ignored for foreground tasks - they run in order on the main thread
432        Task::from_scheduler(self.inner.spawn(future))
433    }
434
435    /// Used by the test harness to run an async test in a synchronous fashion.
436    #[cfg(any(test, feature = "test-support"))]
437    #[track_caller]
438    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
439        use std::cell::Cell;
440
441        let scheduler = self.inner.scheduler();
442
443        let output = Cell::new(None);
444        let future = async {
445            output.set(Some(future.await));
446        };
447        let mut future = std::pin::pin!(future);
448
449        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
450        // (which are associated with the test session) to make progress while we block.
451        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
452        scheduler.block(None, future.as_mut(), None);
453
454        output.take().expect("block_test future did not complete")
455    }
456
457    /// Block the current thread until the given future resolves.
458    /// Consider using `block_with_timeout` instead.
459    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
460        self.inner.block_on(future)
461    }
462
463    /// Block the current thread until the given future resolves or the timeout elapses.
464    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
465        &self,
466        duration: Duration,
467        future: Fut,
468    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
469        self.inner.block_with_timeout(duration, future)
470    }
471
472    #[doc(hidden)]
473    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
474        &self.dispatcher
475    }
476
477    #[doc(hidden)]
478    pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
479        self.inner.clone()
480    }
481}
482
483/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
484pub struct Scope<'a> {
485    executor: BackgroundExecutor,
486    priority: Priority,
487    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
488    tx: Option<mpsc::Sender<()>>,
489    rx: mpsc::Receiver<()>,
490    lifetime: PhantomData<&'a ()>,
491}
492
493impl<'a> Scope<'a> {
494    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
495        let (tx, rx) = mpsc::channel(1);
496        Self {
497            executor,
498            priority,
499            tx: Some(tx),
500            rx,
501            futures: Default::default(),
502            lifetime: PhantomData,
503        }
504    }
505
506    /// How many CPUs are available to the dispatcher.
507    pub fn num_cpus(&self) -> usize {
508        self.executor.num_cpus()
509    }
510
511    /// Spawn a future into this scope.
512    #[track_caller]
513    pub fn spawn<F>(&mut self, f: F)
514    where
515        F: Future<Output = ()> + Send + 'a,
516    {
517        let tx = self.tx.clone().unwrap();
518
519        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
520        // dropping this `Scope` blocks until all of the futures have resolved.
521        let f = unsafe {
522            mem::transmute::<
523                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
524                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
525            >(Box::pin(async move {
526                f.await;
527                drop(tx);
528            }))
529        };
530        self.futures.push(f);
531    }
532}
533
534impl Drop for Scope<'_> {
535    fn drop(&mut self) {
536        self.tx.take().unwrap();
537
538        // Wait until the channel is closed, which means that all of the spawned
539        // futures have resolved.
540        let future = async {
541            self.rx.next().await;
542        };
543        let mut future = std::pin::pin!(future);
544        self.executor
545            .inner
546            .scheduler()
547            .block(None, future.as_mut(), None);
548    }
549}
550
551#[cfg(test)]
552mod test {
553    use super::*;
554    use crate::{App, TestDispatcher, TestPlatform};
555    use std::cell::RefCell;
556
557    /// Helper to create test infrastructure.
558    /// Returns (dispatcher, background_executor, app).
559    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
560        let dispatcher = TestDispatcher::new(0);
561        let arc_dispatcher = Arc::new(dispatcher.clone());
562        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
563        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
564
565        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
566        let asset_source = Arc::new(());
567        let http_client = http_client::FakeHttpClient::with_404_response();
568
569        let app = App::new_app(platform, asset_source, http_client);
570        (dispatcher, background_executor, app)
571    }
572
573    #[test]
574    fn sanity_test_tasks_run() {
575        let (dispatcher, _background_executor, app) = create_test_app();
576        let foreground_executor = app.borrow().foreground_executor.clone();
577
578        let task_ran = Rc::new(RefCell::new(false));
579
580        foreground_executor
581            .spawn({
582                let task_ran = Rc::clone(&task_ran);
583                async move {
584                    *task_ran.borrow_mut() = true;
585                }
586            })
587            .detach();
588
589        // Run dispatcher while app is still alive
590        dispatcher.run_until_parked();
591
592        // Task should have run
593        assert!(
594            *task_ran.borrow(),
595            "Task should run normally when app is alive"
596        );
597    }
598
599    #[test]
600    fn test_task_cancelled_when_app_dropped() {
601        let (dispatcher, _background_executor, app) = create_test_app();
602        let foreground_executor = app.borrow().foreground_executor.clone();
603        let app_weak = Rc::downgrade(&app);
604
605        let task_ran = Rc::new(RefCell::new(false));
606        let task_ran_clone = Rc::clone(&task_ran);
607
608        foreground_executor
609            .spawn(async move {
610                *task_ran_clone.borrow_mut() = true;
611            })
612            .detach();
613
614        drop(app);
615
616        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
617
618        dispatcher.run_until_parked();
619
620        // The task should have been cancelled, not run
621        assert!(
622            !*task_ran.borrow(),
623            "Task should have been cancelled when app was dropped, but it ran!"
624        );
625    }
626
627    #[test]
628    fn test_nested_tasks_both_cancel() {
629        let (dispatcher, _background_executor, app) = create_test_app();
630        let foreground_executor = app.borrow().foreground_executor.clone();
631        let app_weak = Rc::downgrade(&app);
632
633        let outer_completed = Rc::new(RefCell::new(false));
634        let inner_completed = Rc::new(RefCell::new(false));
635        let reached_await = Rc::new(RefCell::new(false));
636
637        let outer_flag = Rc::clone(&outer_completed);
638        let inner_flag = Rc::clone(&inner_completed);
639        let await_flag = Rc::clone(&reached_await);
640
641        // Channel to block the inner task until we're ready
642        let (tx, rx) = futures::channel::oneshot::channel::<()>();
643
644        let inner_executor = foreground_executor.clone();
645
646        foreground_executor
647            .spawn(async move {
648                let inner_task = inner_executor.spawn({
649                    let inner_flag = Rc::clone(&inner_flag);
650                    async move {
651                        rx.await.ok();
652                        *inner_flag.borrow_mut() = true;
653                    }
654                });
655
656                *await_flag.borrow_mut() = true;
657
658                inner_task.await;
659
660                *outer_flag.borrow_mut() = true;
661            })
662            .detach();
663
664        // Run dispatcher until outer task reaches the await point
665        // The inner task will be blocked on the channel
666        dispatcher.run_until_parked();
667
668        // Verify we actually reached the await point before dropping the app
669        assert!(
670            *reached_await.borrow(),
671            "Outer task should have reached the await point"
672        );
673
674        // Neither task should have completed yet
675        assert!(
676            !*outer_completed.borrow(),
677            "Outer task should not have completed yet"
678        );
679        assert!(
680            !*inner_completed.borrow(),
681            "Inner task should not have completed yet"
682        );
683
684        // Drop the channel sender and app while outer is awaiting inner
685        drop(tx);
686        drop(app);
687        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
688
689        // Run dispatcher - both tasks should be cancelled
690        dispatcher.run_until_parked();
691
692        // Neither task should have completed (both were cancelled)
693        assert!(
694            !*outer_completed.borrow(),
695            "Outer task should have been cancelled, not completed"
696        );
697        assert!(
698            !*inner_completed.borrow(),
699            "Inner task should have been cancelled, not completed"
700        );
701    }
702
703    #[test]
704    #[should_panic]
705    fn test_polling_cancelled_task_panics() {
706        let (dispatcher, _background_executor, app) = create_test_app();
707        let foreground_executor = app.borrow().foreground_executor.clone();
708        let app_weak = Rc::downgrade(&app);
709
710        let task = foreground_executor.spawn(async move { 42 });
711
712        drop(app);
713
714        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
715
716        dispatcher.run_until_parked();
717
718        foreground_executor.block_on(task);
719    }
720
721    #[test]
722    fn test_polling_cancelled_task_returns_none_with_fallible() {
723        let (dispatcher, _background_executor, app) = create_test_app();
724        let foreground_executor = app.borrow().foreground_executor.clone();
725        let app_weak = Rc::downgrade(&app);
726
727        let task = foreground_executor.spawn(async move { 42 }).fallible();
728
729        drop(app);
730
731        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
732
733        dispatcher.run_until_parked();
734
735        let result = foreground_executor.block_on(task);
736        assert_eq!(result, None, "Cancelled task should return None");
737    }
738}