executor.rs

  1use crate::{App, PlatformDispatcher, PlatformScheduler};
  2use futures::channel::mpsc;
  3use scheduler::Scheduler;
  4use smol::prelude::*;
  5use std::{
  6    fmt::Debug,
  7    future::Future,
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    time::{Duration, Instant},
 14};
 15use util::TryFutureExt;
 16
 17pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
 18
 19/// A pointer to the executor that is currently running,
 20/// for spawning background tasks.
 21#[derive(Clone)]
 22pub struct BackgroundExecutor {
 23    inner: scheduler::BackgroundExecutor,
 24    dispatcher: Arc<dyn PlatformDispatcher>,
 25}
 26
 27/// A pointer to the executor that is currently running,
 28/// for spawning tasks on the main thread.
 29#[derive(Clone)]
 30pub struct ForegroundExecutor {
 31    inner: scheduler::ForegroundExecutor,
 32    dispatcher: Arc<dyn PlatformDispatcher>,
 33    not_send: PhantomData<Rc<()>>,
 34}
 35
 36/// Task is a primitive that allows work to happen in the background.
 37///
 38/// It implements [`Future`] so you can `.await` on it.
 39///
 40/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 41/// the task to continue running, but with no way to return a value.
 42#[must_use]
 43#[derive(Debug)]
 44pub struct Task<T>(scheduler::Task<T>);
 45
 46impl<T> Task<T> {
 47    /// Creates a new task that will resolve with the value.
 48    pub fn ready(val: T) -> Self {
 49        Task(scheduler::Task::ready(val))
 50    }
 51
 52    /// Returns true if the task has completed or was created with `Task::ready`.
 53    pub fn is_ready(&self) -> bool {
 54        self.0.is_ready()
 55    }
 56
 57    /// Detaching a task runs it to completion in the background.
 58    pub fn detach(self) {
 59        self.0.detach()
 60    }
 61
 62    /// Wraps a scheduler::Task.
 63    pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
 64        Task(task)
 65    }
 66
 67    /// Converts this task into a fallible task that returns `Option<T>`.
 68    ///
 69    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 70    /// if the task was cancelled.
 71    ///
 72    /// # Example
 73    ///
 74    /// ```ignore
 75    /// // Background task that gracefully handles cancellation:
 76    /// cx.background_spawn(async move {
 77    ///     let result = foreground_task.fallible().await;
 78    ///     if let Some(value) = result {
 79    ///         // Process the value
 80    ///     }
 81    ///     // If None, task was cancelled - just exit gracefully
 82    /// }).detach();
 83    /// ```
 84    pub fn fallible(self) -> FallibleTask<T> {
 85        self.0.fallible()
 86    }
 87}
 88
 89impl<T, E> Task<Result<T, E>>
 90where
 91    T: 'static,
 92    E: 'static + Debug,
 93{
 94    /// Run the task to completion in the background and log any errors that occur.
 95    #[track_caller]
 96    pub fn detach_and_log_err(self, cx: &App) {
 97        let location = core::panic::Location::caller();
 98        cx.foreground_executor()
 99            .spawn(self.log_tracked_err(*location))
100            .detach();
101    }
102}
103
104impl<T> std::future::Future for Task<T> {
105    type Output = T;
106
107    fn poll(
108        self: std::pin::Pin<&mut Self>,
109        cx: &mut std::task::Context<'_>,
110    ) -> std::task::Poll<Self::Output> {
111        // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
112        // and we're just projecting the pin through to the inner task.
113        let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
114        inner.poll(cx)
115    }
116}
117
118impl BackgroundExecutor {
119    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
120    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
121        #[cfg(any(test, feature = "test-support"))]
122        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
123            test_dispatcher.scheduler().clone()
124        } else {
125            Arc::new(PlatformScheduler::new(dispatcher.clone()))
126        };
127
128        #[cfg(not(any(test, feature = "test-support")))]
129        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
130
131        Self {
132            inner: scheduler::BackgroundExecutor::new(scheduler),
133            dispatcher,
134        }
135    }
136
137    /// Close this executor. Tasks will not run after this is called.
138    pub fn close(&self) {
139        self.inner.close();
140    }
141
142    /// Enqueues the given future to be run to completion on a background thread.
143    #[track_caller]
144    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
145    where
146        R: Send + 'static,
147    {
148        self.spawn_with_priority(Priority::default(), future.boxed())
149    }
150
151    /// Enqueues the given future to be run to completion on a background thread with the given priority.
152    ///
153    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
154    /// realtime scheduling priority, suitable for audio processing.
155    #[track_caller]
156    pub fn spawn_with_priority<R>(
157        &self,
158        priority: Priority,
159        future: impl Future<Output = R> + Send + 'static,
160    ) -> Task<R>
161    where
162        R: Send + 'static,
163    {
164        if priority == Priority::RealtimeAudio {
165            Task::from_scheduler(self.inner.spawn_realtime(future))
166        } else {
167            Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
168        }
169    }
170
171    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
172    ///
173    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
174    /// completion before the current task is resumed, even if the current task is slated for cancellation.
175    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
176    where
177        R: Send,
178    {
179        use crate::RunnableMeta;
180        use parking_lot::{Condvar, Mutex};
181        use std::sync::{Arc, atomic::AtomicBool};
182
183        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
184
185        impl Drop for NotifyOnDrop<'_> {
186            fn drop(&mut self) {
187                *self.0.1.lock() = true;
188                self.0.0.notify_all();
189            }
190        }
191
192        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
193
194        impl Drop for WaitOnDrop<'_> {
195            fn drop(&mut self) {
196                let mut done = self.0.1.lock();
197                if !*done {
198                    self.0.0.wait(&mut done);
199                }
200            }
201        }
202
203        let dispatcher = self.dispatcher.clone();
204        let location = core::panic::Location::caller();
205        let closed = Arc::new(AtomicBool::new(false));
206
207        let pair = &(Condvar::new(), Mutex::new(false));
208        let _wait_guard = WaitOnDrop(pair);
209
210        let (runnable, task) = unsafe {
211            async_task::Builder::new()
212                .metadata(RunnableMeta { location, closed })
213                .spawn_unchecked(
214                    move |_| async {
215                        let _notify_guard = NotifyOnDrop(pair);
216                        future.await
217                    },
218                    move |runnable| {
219                        dispatcher.dispatch(runnable, Priority::default());
220                    },
221                )
222        };
223        runnable.schedule();
224        task.await
225    }
226
227    /// Scoped lets you start a number of tasks and waits
228    /// for all of them to complete before returning.
229    pub async fn scoped<'scope, F>(&self, scheduler: F)
230    where
231        F: FnOnce(&mut Scope<'scope>),
232    {
233        let mut scope = Scope::new(self.clone(), Priority::default());
234        (scheduler)(&mut scope);
235        let spawned = mem::take(&mut scope.futures)
236            .into_iter()
237            .map(|f| self.spawn_with_priority(scope.priority, f))
238            .collect::<Vec<_>>();
239        for task in spawned {
240            task.await;
241        }
242    }
243
244    /// Scoped lets you start a number of tasks and waits
245    /// for all of them to complete before returning.
246    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
247    where
248        F: FnOnce(&mut Scope<'scope>),
249    {
250        let mut scope = Scope::new(self.clone(), priority);
251        (scheduler)(&mut scope);
252        let spawned = mem::take(&mut scope.futures)
253            .into_iter()
254            .map(|f| self.spawn_with_priority(scope.priority, f))
255            .collect::<Vec<_>>();
256        for task in spawned {
257            task.await;
258        }
259    }
260
261    /// Get the current time.
262    ///
263    /// Calling this instead of `std::time::Instant::now` allows the use
264    /// of fake timers in tests.
265    pub fn now(&self) -> Instant {
266        self.inner.scheduler().clock().now()
267    }
268
269    /// Returns a task that will complete after the given duration.
270    /// Depending on other concurrent tasks the elapsed duration may be longer
271    /// than requested.
272    pub fn timer(&self, duration: Duration) -> Task<()> {
273        if duration.is_zero() {
274            return Task::ready(());
275        }
276        self.spawn(self.inner.scheduler().timer(duration))
277    }
278
279    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
280    #[cfg(any(test, feature = "test-support"))]
281    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
282        self.dispatcher.as_test().unwrap().simulate_random_delay()
283    }
284
285    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
286    #[cfg(any(test, feature = "test-support"))]
287    pub fn advance_clock(&self, duration: Duration) {
288        self.dispatcher.as_test().unwrap().advance_clock(duration)
289    }
290
291    /// In tests, run one task.
292    #[cfg(any(test, feature = "test-support"))]
293    pub fn tick(&self) -> bool {
294        self.dispatcher.as_test().unwrap().scheduler().tick()
295    }
296
297    /// In tests, run tasks until the scheduler would park.
298    ///
299    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
300    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
301    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
302    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
303    #[cfg(any(test, feature = "test-support"))]
304    pub fn run_until_parked(&self) {
305        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
306        scheduler.run();
307    }
308
309    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
310    #[cfg(any(test, feature = "test-support"))]
311    pub fn allow_parking(&self) {
312        self.dispatcher
313            .as_test()
314            .unwrap()
315            .scheduler()
316            .allow_parking();
317
318        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
319            log::warn!("[gpui::executor] allow_parking: enabled");
320        }
321    }
322
323    /// Sets the range of ticks to run before timing out in block_on.
324    #[cfg(any(test, feature = "test-support"))]
325    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
326        self.dispatcher
327            .as_test()
328            .unwrap()
329            .scheduler()
330            .set_timeout_ticks(range);
331    }
332
333    /// Undoes the effect of [`Self::allow_parking`].
334    #[cfg(any(test, feature = "test-support"))]
335    pub fn forbid_parking(&self) {
336        self.dispatcher
337            .as_test()
338            .unwrap()
339            .scheduler()
340            .forbid_parking();
341    }
342
343    /// In tests, returns the rng used by the dispatcher.
344    #[cfg(any(test, feature = "test-support"))]
345    pub fn rng(&self) -> scheduler::SharedRng {
346        self.dispatcher.as_test().unwrap().scheduler().rng()
347    }
348
349    /// How many CPUs are available to the dispatcher.
350    pub fn num_cpus(&self) -> usize {
351        #[cfg(any(test, feature = "test-support"))]
352        if let Some(test) = self.dispatcher.as_test() {
353            return test.num_cpus_override().unwrap_or(4);
354        }
355        num_cpus::get()
356    }
357
358    /// Override the number of CPUs reported by this executor in tests.
359    /// Panics if not called on a test executor.
360    #[cfg(any(test, feature = "test-support"))]
361    pub fn set_num_cpus(&self, count: usize) {
362        self.dispatcher
363            .as_test()
364            .expect("set_num_cpus can only be called on a test executor")
365            .set_num_cpus(count);
366    }
367
368    /// Whether we're on the main thread.
369    pub fn is_main_thread(&self) -> bool {
370        self.dispatcher.is_main_thread()
371    }
372
373    #[doc(hidden)]
374    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
375        &self.dispatcher
376    }
377}
378
379impl ForegroundExecutor {
380    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
381    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
382        #[cfg(any(test, feature = "test-support"))]
383        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
384            if let Some(test_dispatcher) = dispatcher.as_test() {
385                (
386                    test_dispatcher.scheduler().clone(),
387                    test_dispatcher.session_id(),
388                )
389            } else {
390                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
391                let session_id = platform_scheduler.allocate_session_id();
392                (platform_scheduler, session_id)
393            };
394
395        #[cfg(not(any(test, feature = "test-support")))]
396        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
397            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
398            let session_id = platform_scheduler.allocate_session_id();
399            (platform_scheduler, session_id)
400        };
401
402        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
403
404        Self {
405            inner,
406            dispatcher,
407            not_send: PhantomData,
408        }
409    }
410
411    /// Close this executor. Tasks will not run after this is called.
412    pub fn close(&self) {
413        self.inner.close();
414    }
415
416    /// Enqueues the given Task to run on the main thread.
417    #[track_caller]
418    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
419    where
420        R: 'static,
421    {
422        Task::from_scheduler(self.inner.spawn(future.boxed_local()))
423    }
424
425    /// Enqueues the given Task to run on the main thread with the given priority.
426    #[track_caller]
427    pub fn spawn_with_priority<R>(
428        &self,
429        _priority: Priority,
430        future: impl Future<Output = R> + 'static,
431    ) -> Task<R>
432    where
433        R: 'static,
434    {
435        // Priority is ignored for foreground tasks - they run in order on the main thread
436        Task::from_scheduler(self.inner.spawn(future))
437    }
438
439    /// Used by the test harness to run an async test in a synchronous fashion.
440    #[cfg(any(test, feature = "test-support"))]
441    #[track_caller]
442    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
443        use std::cell::Cell;
444
445        let scheduler = self.inner.scheduler();
446
447        let output = Cell::new(None);
448        let future = async {
449            output.set(Some(future.await));
450        };
451        let mut future = std::pin::pin!(future);
452
453        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
454        // (which are associated with the test session) to make progress while we block.
455        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
456        scheduler.block(None, future.as_mut(), None);
457
458        output.take().expect("block_test future did not complete")
459    }
460
461    /// Block the current thread until the given future resolves.
462    /// Consider using `block_with_timeout` instead.
463    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
464        self.inner.block_on(future)
465    }
466
467    /// Block the current thread until the given future resolves or the timeout elapses.
468    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
469        &self,
470        duration: Duration,
471        future: Fut,
472    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
473        self.inner.block_with_timeout(duration, future)
474    }
475
476    #[doc(hidden)]
477    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
478        &self.dispatcher
479    }
480
481    #[doc(hidden)]
482    pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
483        self.inner.clone()
484    }
485}
486
487/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
488pub struct Scope<'a> {
489    executor: BackgroundExecutor,
490    priority: Priority,
491    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
492    tx: Option<mpsc::Sender<()>>,
493    rx: mpsc::Receiver<()>,
494    lifetime: PhantomData<&'a ()>,
495}
496
497impl<'a> Scope<'a> {
498    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
499        let (tx, rx) = mpsc::channel(1);
500        Self {
501            executor,
502            priority,
503            tx: Some(tx),
504            rx,
505            futures: Default::default(),
506            lifetime: PhantomData,
507        }
508    }
509
510    /// How many CPUs are available to the dispatcher.
511    pub fn num_cpus(&self) -> usize {
512        self.executor.num_cpus()
513    }
514
515    /// Spawn a future into this scope.
516    #[track_caller]
517    pub fn spawn<F>(&mut self, f: F)
518    where
519        F: Future<Output = ()> + Send + 'a,
520    {
521        let tx = self.tx.clone().unwrap();
522
523        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
524        // dropping this `Scope` blocks until all of the futures have resolved.
525        let f = unsafe {
526            mem::transmute::<
527                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
528                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
529            >(Box::pin(async move {
530                f.await;
531                drop(tx);
532            }))
533        };
534        self.futures.push(f);
535    }
536}
537
538impl Drop for Scope<'_> {
539    fn drop(&mut self) {
540        self.tx.take().unwrap();
541
542        // Wait until the channel is closed, which means that all of the spawned
543        // futures have resolved.
544        let future = async {
545            self.rx.next().await;
546        };
547        let mut future = std::pin::pin!(future);
548        self.executor
549            .inner
550            .scheduler()
551            .block(None, future.as_mut(), None);
552    }
553}
554
555#[cfg(test)]
556mod test {
557    use super::*;
558    use crate::{App, TestDispatcher, TestPlatform};
559    use std::cell::RefCell;
560
561    /// Helper to create test infrastructure.
562    /// Returns (dispatcher, background_executor, app).
563    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
564        let dispatcher = TestDispatcher::new(0);
565        let arc_dispatcher = Arc::new(dispatcher.clone());
566        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
567        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
568
569        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
570        let asset_source = Arc::new(());
571        let http_client = http_client::FakeHttpClient::with_404_response();
572
573        let app = App::new_app(platform, asset_source, http_client);
574        (dispatcher, background_executor, app)
575    }
576
577    #[test]
578    fn sanity_test_tasks_run() {
579        let (dispatcher, _background_executor, app) = create_test_app();
580        let foreground_executor = app.borrow().foreground_executor.clone();
581
582        let task_ran = Rc::new(RefCell::new(false));
583
584        foreground_executor
585            .spawn({
586                let task_ran = Rc::clone(&task_ran);
587                async move {
588                    *task_ran.borrow_mut() = true;
589                }
590            })
591            .detach();
592
593        // Run dispatcher while app is still alive
594        dispatcher.run_until_parked();
595
596        // Task should have run
597        assert!(
598            *task_ran.borrow(),
599            "Task should run normally when app is alive"
600        );
601    }
602
603    #[test]
604    fn test_task_cancelled_when_app_dropped() {
605        let (dispatcher, _background_executor, app) = create_test_app();
606        let foreground_executor = app.borrow().foreground_executor.clone();
607        let app_weak = Rc::downgrade(&app);
608
609        let task_ran = Rc::new(RefCell::new(false));
610        let task_ran_clone = Rc::clone(&task_ran);
611
612        foreground_executor
613            .spawn(async move {
614                *task_ran_clone.borrow_mut() = true;
615            })
616            .detach();
617
618        drop(app);
619
620        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
621
622        dispatcher.run_until_parked();
623
624        // The task should have been cancelled, not run
625        assert!(
626            !*task_ran.borrow(),
627            "Task should have been cancelled when app was dropped, but it ran!"
628        );
629    }
630
631    #[test]
632    fn test_nested_tasks_both_cancel() {
633        let (dispatcher, _background_executor, app) = create_test_app();
634        let foreground_executor = app.borrow().foreground_executor.clone();
635        let app_weak = Rc::downgrade(&app);
636
637        let outer_completed = Rc::new(RefCell::new(false));
638        let inner_completed = Rc::new(RefCell::new(false));
639        let reached_await = Rc::new(RefCell::new(false));
640
641        let outer_flag = Rc::clone(&outer_completed);
642        let inner_flag = Rc::clone(&inner_completed);
643        let await_flag = Rc::clone(&reached_await);
644
645        // Channel to block the inner task until we're ready
646        let (tx, rx) = futures::channel::oneshot::channel::<()>();
647
648        let inner_executor = foreground_executor.clone();
649
650        foreground_executor
651            .spawn(async move {
652                let inner_task = inner_executor.spawn({
653                    let inner_flag = Rc::clone(&inner_flag);
654                    async move {
655                        rx.await.ok();
656                        *inner_flag.borrow_mut() = true;
657                    }
658                });
659
660                *await_flag.borrow_mut() = true;
661
662                inner_task.await;
663
664                *outer_flag.borrow_mut() = true;
665            })
666            .detach();
667
668        // Run dispatcher until outer task reaches the await point
669        // The inner task will be blocked on the channel
670        dispatcher.run_until_parked();
671
672        // Verify we actually reached the await point before dropping the app
673        assert!(
674            *reached_await.borrow(),
675            "Outer task should have reached the await point"
676        );
677
678        // Neither task should have completed yet
679        assert!(
680            !*outer_completed.borrow(),
681            "Outer task should not have completed yet"
682        );
683        assert!(
684            !*inner_completed.borrow(),
685            "Inner task should not have completed yet"
686        );
687
688        // Drop the channel sender and app while outer is awaiting inner
689        drop(tx);
690        drop(app);
691        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
692
693        // Run dispatcher - both tasks should be cancelled
694        dispatcher.run_until_parked();
695
696        // Neither task should have completed (both were cancelled)
697        assert!(
698            !*outer_completed.borrow(),
699            "Outer task should have been cancelled, not completed"
700        );
701        assert!(
702            !*inner_completed.borrow(),
703            "Inner task should have been cancelled, not completed"
704        );
705    }
706
707    #[test]
708    #[should_panic]
709    fn test_polling_cancelled_task_panics() {
710        let (dispatcher, _background_executor, app) = create_test_app();
711        let foreground_executor = app.borrow().foreground_executor.clone();
712        let app_weak = Rc::downgrade(&app);
713
714        let task = foreground_executor.spawn(async move { 42 });
715
716        drop(app);
717
718        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
719
720        dispatcher.run_until_parked();
721
722        foreground_executor.block_on(task);
723    }
724
725    #[test]
726    fn test_polling_cancelled_task_returns_none_with_fallible() {
727        let (dispatcher, _background_executor, app) = create_test_app();
728        let foreground_executor = app.borrow().foreground_executor.clone();
729        let app_weak = Rc::downgrade(&app);
730
731        let task = foreground_executor.spawn(async move { 42 }).fallible();
732
733        drop(app);
734
735        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
736
737        dispatcher.run_until_parked();
738
739        let result = foreground_executor.block_on(task);
740        assert_eq!(result, None, "Cancelled task should return None");
741    }
742}