1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::TryFutureExt;
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{
8 fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
9 time::Duration,
10};
11
12pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
13
14/// A pointer to the executor that is currently running,
15/// for spawning background tasks.
16#[derive(Clone)]
17pub struct BackgroundExecutor {
18 inner: scheduler::BackgroundExecutor,
19 dispatcher: Arc<dyn PlatformDispatcher>,
20}
21
22/// A pointer to the executor that is currently running,
23/// for spawning tasks on the main thread.
24#[derive(Clone)]
25pub struct ForegroundExecutor {
26 inner: scheduler::ForegroundExecutor,
27 dispatcher: Arc<dyn PlatformDispatcher>,
28 not_send: PhantomData<Rc<()>>,
29}
30
31/// Task is a primitive that allows work to happen in the background.
32///
33/// It implements [`Future`] so you can `.await` on it.
34///
35/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
36/// the task to continue running, but with no way to return a value.
37#[must_use]
38#[derive(Debug)]
39pub struct Task<T>(scheduler::Task<T>);
40
41impl<T> Task<T> {
42 /// Creates a new task that will resolve with the value.
43 pub fn ready(val: T) -> Self {
44 Task(scheduler::Task::ready(val))
45 }
46
47 /// Returns true if the task has completed or was created with `Task::ready`.
48 pub fn is_ready(&self) -> bool {
49 self.0.is_ready()
50 }
51
52 /// Detaching a task runs it to completion in the background.
53 pub fn detach(self) {
54 self.0.detach()
55 }
56
57 /// Wraps a scheduler::Task.
58 pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
59 Task(task)
60 }
61
62 /// Converts this task into a fallible task that returns `Option<T>`.
63 ///
64 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
65 /// if the task was cancelled.
66 ///
67 /// # Example
68 ///
69 /// ```ignore
70 /// // Background task that gracefully handles cancellation:
71 /// cx.background_spawn(async move {
72 /// let result = foreground_task.fallible().await;
73 /// if let Some(value) = result {
74 /// // Process the value
75 /// }
76 /// // If None, task was cancelled - just exit gracefully
77 /// }).detach();
78 /// ```
79 pub fn fallible(self) -> FallibleTask<T> {
80 self.0.fallible()
81 }
82}
83
84impl<T, E> Task<Result<T, E>>
85where
86 T: 'static,
87 E: 'static + Debug,
88{
89 /// Run the task to completion in the background and log any errors that occur.
90 #[track_caller]
91 pub fn detach_and_log_err(self, cx: &App) {
92 let location = core::panic::Location::caller();
93 cx.foreground_executor()
94 .spawn(self.log_tracked_err(*location))
95 .detach();
96 }
97}
98
99impl<T> std::future::Future for Task<T> {
100 type Output = T;
101
102 fn poll(
103 self: std::pin::Pin<&mut Self>,
104 cx: &mut std::task::Context<'_>,
105 ) -> std::task::Poll<Self::Output> {
106 // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
107 // and we're just projecting the pin through to the inner task.
108 let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
109 inner.poll(cx)
110 }
111}
112
113impl BackgroundExecutor {
114 /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
115 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
116 #[cfg(any(test, feature = "test-support"))]
117 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
118 test_dispatcher.scheduler().clone()
119 } else {
120 Arc::new(PlatformScheduler::new(dispatcher.clone()))
121 };
122
123 #[cfg(not(any(test, feature = "test-support")))]
124 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
125
126 Self {
127 inner: scheduler::BackgroundExecutor::new(scheduler),
128 dispatcher,
129 }
130 }
131
132 /// Close this executor. Tasks will not run after this is called.
133 pub fn close(&self) {
134 self.inner.close();
135 }
136
137 /// Enqueues the given future to be run to completion on a background thread.
138 #[track_caller]
139 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
140 where
141 R: Send + 'static,
142 {
143 self.spawn_with_priority(Priority::default(), future.boxed())
144 }
145
146 /// Enqueues the given future to be run to completion on a background thread with the given priority.
147 ///
148 /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
149 /// realtime scheduling priority, suitable for audio processing.
150 #[track_caller]
151 pub fn spawn_with_priority<R>(
152 &self,
153 priority: Priority,
154 future: impl Future<Output = R> + Send + 'static,
155 ) -> Task<R>
156 where
157 R: Send + 'static,
158 {
159 if priority == Priority::RealtimeAudio {
160 Task::from_scheduler(self.inner.spawn_realtime(future))
161 } else {
162 Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
163 }
164 }
165
166 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
167 ///
168 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
169 /// completion before the current task is resumed, even if the current task is slated for cancellation.
170 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
171 where
172 R: Send,
173 {
174 use crate::RunnableMeta;
175 use parking_lot::{Condvar, Mutex};
176 use std::sync::{Arc, atomic::AtomicBool};
177
178 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
179
180 impl Drop for NotifyOnDrop<'_> {
181 fn drop(&mut self) {
182 *self.0.1.lock() = true;
183 self.0.0.notify_all();
184 }
185 }
186
187 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
188
189 impl Drop for WaitOnDrop<'_> {
190 fn drop(&mut self) {
191 let mut done = self.0.1.lock();
192 if !*done {
193 self.0.0.wait(&mut done);
194 }
195 }
196 }
197
198 let dispatcher = self.dispatcher.clone();
199 let location = core::panic::Location::caller();
200 let closed = Arc::new(AtomicBool::new(false));
201
202 let pair = &(Condvar::new(), Mutex::new(false));
203 let _wait_guard = WaitOnDrop(pair);
204
205 let (runnable, task) = unsafe {
206 async_task::Builder::new()
207 .metadata(RunnableMeta { location, closed })
208 .spawn_unchecked(
209 move |_| async {
210 let _notify_guard = NotifyOnDrop(pair);
211 future.await
212 },
213 move |runnable| {
214 dispatcher.dispatch(runnable, Priority::default());
215 },
216 )
217 };
218 runnable.schedule();
219 task.await
220 }
221
222 /// Scoped lets you start a number of tasks and waits
223 /// for all of them to complete before returning.
224 pub async fn scoped<'scope, F>(&self, scheduler: F)
225 where
226 F: FnOnce(&mut Scope<'scope>),
227 {
228 let mut scope = Scope::new(self.clone(), Priority::default());
229 (scheduler)(&mut scope);
230 let spawned = mem::take(&mut scope.futures)
231 .into_iter()
232 .map(|f| self.spawn_with_priority(scope.priority, f))
233 .collect::<Vec<_>>();
234 for task in spawned {
235 task.await;
236 }
237 }
238
239 /// Scoped lets you start a number of tasks and waits
240 /// for all of them to complete before returning.
241 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
242 where
243 F: FnOnce(&mut Scope<'scope>),
244 {
245 let mut scope = Scope::new(self.clone(), priority);
246 (scheduler)(&mut scope);
247 let spawned = mem::take(&mut scope.futures)
248 .into_iter()
249 .map(|f| self.spawn_with_priority(scope.priority, f))
250 .collect::<Vec<_>>();
251 for task in spawned {
252 task.await;
253 }
254 }
255
256 /// Get the current time.
257 ///
258 /// Calling this instead of `std::time::Instant::now` allows the use
259 /// of fake timers in tests.
260 pub fn now(&self) -> Instant {
261 self.inner.scheduler().clock().now()
262 }
263
264 /// Returns a task that will complete after the given duration.
265 /// Depending on other concurrent tasks the elapsed duration may be longer
266 /// than requested.
267 #[track_caller]
268 pub fn timer(&self, duration: Duration) -> Task<()> {
269 if duration.is_zero() {
270 return Task::ready(());
271 }
272 self.spawn(self.inner.scheduler().timer(duration))
273 }
274
275 /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
276 #[cfg(any(test, feature = "test-support"))]
277 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
278 self.dispatcher.as_test().unwrap().simulate_random_delay()
279 }
280
281 /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
282 #[cfg(any(test, feature = "test-support"))]
283 pub fn advance_clock(&self, duration: Duration) {
284 self.dispatcher.as_test().unwrap().advance_clock(duration)
285 }
286
287 /// In tests, run one task.
288 #[cfg(any(test, feature = "test-support"))]
289 pub fn tick(&self) -> bool {
290 self.dispatcher.as_test().unwrap().scheduler().tick()
291 }
292
293 /// In tests, run tasks until the scheduler would park.
294 ///
295 /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
296 /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
297 /// drained. To preserve the historical semantics that tests relied on (drain all work that can
298 /// make progress), we advance the clock to the next timer when no runnable tasks remain.
299 #[cfg(any(test, feature = "test-support"))]
300 pub fn run_until_parked(&self) {
301 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
302 scheduler.run();
303 }
304
305 /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
306 #[cfg(any(test, feature = "test-support"))]
307 pub fn allow_parking(&self) {
308 self.dispatcher
309 .as_test()
310 .unwrap()
311 .scheduler()
312 .allow_parking();
313
314 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
315 log::warn!("[gpui::executor] allow_parking: enabled");
316 }
317 }
318
319 /// Sets the range of ticks to run before timing out in block_on.
320 #[cfg(any(test, feature = "test-support"))]
321 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
322 self.dispatcher
323 .as_test()
324 .unwrap()
325 .scheduler()
326 .set_timeout_ticks(range);
327 }
328
329 /// Undoes the effect of [`Self::allow_parking`].
330 #[cfg(any(test, feature = "test-support"))]
331 pub fn forbid_parking(&self) {
332 self.dispatcher
333 .as_test()
334 .unwrap()
335 .scheduler()
336 .forbid_parking();
337 }
338
339 /// In tests, returns the rng used by the dispatcher.
340 #[cfg(any(test, feature = "test-support"))]
341 pub fn rng(&self) -> scheduler::SharedRng {
342 self.dispatcher.as_test().unwrap().scheduler().rng()
343 }
344
345 /// How many CPUs are available to the dispatcher.
346 pub fn num_cpus(&self) -> usize {
347 #[cfg(any(test, feature = "test-support"))]
348 if let Some(test) = self.dispatcher.as_test() {
349 return test.num_cpus_override().unwrap_or(4);
350 }
351 num_cpus::get()
352 }
353
354 /// Override the number of CPUs reported by this executor in tests.
355 /// Panics if not called on a test executor.
356 #[cfg(any(test, feature = "test-support"))]
357 pub fn set_num_cpus(&self, count: usize) {
358 self.dispatcher
359 .as_test()
360 .expect("set_num_cpus can only be called on a test executor")
361 .set_num_cpus(count);
362 }
363
364 /// Whether we're on the main thread.
365 pub fn is_main_thread(&self) -> bool {
366 self.dispatcher.is_main_thread()
367 }
368
369 #[doc(hidden)]
370 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
371 &self.dispatcher
372 }
373}
374
375impl ForegroundExecutor {
376 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
377 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
378 #[cfg(any(test, feature = "test-support"))]
379 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
380 if let Some(test_dispatcher) = dispatcher.as_test() {
381 (
382 test_dispatcher.scheduler().clone(),
383 test_dispatcher.session_id(),
384 )
385 } else {
386 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
387 let session_id = platform_scheduler.allocate_session_id();
388 (platform_scheduler, session_id)
389 };
390
391 #[cfg(not(any(test, feature = "test-support")))]
392 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
393 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
394 let session_id = platform_scheduler.allocate_session_id();
395 (platform_scheduler, session_id)
396 };
397
398 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
399
400 Self {
401 inner,
402 dispatcher,
403 not_send: PhantomData,
404 }
405 }
406
407 /// Close this executor. Tasks will not run after this is called.
408 pub fn close(&self) {
409 self.inner.close();
410 }
411
412 /// Enqueues the given Task to run on the main thread.
413 #[track_caller]
414 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
415 where
416 R: 'static,
417 {
418 Task::from_scheduler(self.inner.spawn(future.boxed_local()))
419 }
420
421 /// Enqueues the given Task to run on the main thread with the given priority.
422 #[track_caller]
423 pub fn spawn_with_priority<R>(
424 &self,
425 _priority: Priority,
426 future: impl Future<Output = R> + 'static,
427 ) -> Task<R>
428 where
429 R: 'static,
430 {
431 // Priority is ignored for foreground tasks - they run in order on the main thread
432 Task::from_scheduler(self.inner.spawn(future))
433 }
434
435 /// Used by the test harness to run an async test in a synchronous fashion.
436 #[cfg(any(test, feature = "test-support"))]
437 #[track_caller]
438 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
439 use std::cell::Cell;
440
441 let scheduler = self.inner.scheduler();
442
443 let output = Cell::new(None);
444 let future = async {
445 output.set(Some(future.await));
446 };
447 let mut future = std::pin::pin!(future);
448
449 // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
450 // (which are associated with the test session) to make progress while we block.
451 // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
452 scheduler.block(None, future.as_mut(), None);
453
454 output.take().expect("block_test future did not complete")
455 }
456
457 /// Block the current thread until the given future resolves.
458 /// Consider using `block_with_timeout` instead.
459 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
460 self.inner.block_on(future)
461 }
462
463 /// Block the current thread until the given future resolves or the timeout elapses.
464 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
465 &self,
466 duration: Duration,
467 future: Fut,
468 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
469 self.inner.block_with_timeout(duration, future)
470 }
471
472 #[doc(hidden)]
473 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
474 &self.dispatcher
475 }
476
477 #[doc(hidden)]
478 pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
479 self.inner.clone()
480 }
481}
482
483/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
484pub struct Scope<'a> {
485 executor: BackgroundExecutor,
486 priority: Priority,
487 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
488 tx: Option<mpsc::Sender<()>>,
489 rx: mpsc::Receiver<()>,
490 lifetime: PhantomData<&'a ()>,
491}
492
493impl<'a> Scope<'a> {
494 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
495 let (tx, rx) = mpsc::channel(1);
496 Self {
497 executor,
498 priority,
499 tx: Some(tx),
500 rx,
501 futures: Default::default(),
502 lifetime: PhantomData,
503 }
504 }
505
506 /// How many CPUs are available to the dispatcher.
507 pub fn num_cpus(&self) -> usize {
508 self.executor.num_cpus()
509 }
510
511 /// Spawn a future into this scope.
512 #[track_caller]
513 pub fn spawn<F>(&mut self, f: F)
514 where
515 F: Future<Output = ()> + Send + 'a,
516 {
517 let tx = self.tx.clone().unwrap();
518
519 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
520 // dropping this `Scope` blocks until all of the futures have resolved.
521 let f = unsafe {
522 mem::transmute::<
523 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
524 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
525 >(Box::pin(async move {
526 f.await;
527 drop(tx);
528 }))
529 };
530 self.futures.push(f);
531 }
532}
533
534impl Drop for Scope<'_> {
535 fn drop(&mut self) {
536 self.tx.take().unwrap();
537
538 // Wait until the channel is closed, which means that all of the spawned
539 // futures have resolved.
540 let future = async {
541 self.rx.next().await;
542 };
543 let mut future = std::pin::pin!(future);
544 self.executor
545 .inner
546 .scheduler()
547 .block(None, future.as_mut(), None);
548 }
549}
550
551#[cfg(test)]
552mod test {
553 use super::*;
554 use crate::{App, TestDispatcher, TestPlatform};
555 use std::cell::RefCell;
556
557 /// Helper to create test infrastructure.
558 /// Returns (dispatcher, background_executor, app).
559 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
560 let dispatcher = TestDispatcher::new(0);
561 let arc_dispatcher = Arc::new(dispatcher.clone());
562 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
563 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
564
565 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
566 let asset_source = Arc::new(());
567 let http_client = http_client::FakeHttpClient::with_404_response();
568
569 let app = App::new_app(platform, asset_source, http_client);
570 (dispatcher, background_executor, app)
571 }
572
573 #[test]
574 fn sanity_test_tasks_run() {
575 let (dispatcher, _background_executor, app) = create_test_app();
576 let foreground_executor = app.borrow().foreground_executor.clone();
577
578 let task_ran = Rc::new(RefCell::new(false));
579
580 foreground_executor
581 .spawn({
582 let task_ran = Rc::clone(&task_ran);
583 async move {
584 *task_ran.borrow_mut() = true;
585 }
586 })
587 .detach();
588
589 // Run dispatcher while app is still alive
590 dispatcher.run_until_parked();
591
592 // Task should have run
593 assert!(
594 *task_ran.borrow(),
595 "Task should run normally when app is alive"
596 );
597 }
598
599 #[test]
600 fn test_task_cancelled_when_app_dropped() {
601 let (dispatcher, _background_executor, app) = create_test_app();
602 let foreground_executor = app.borrow().foreground_executor.clone();
603 let app_weak = Rc::downgrade(&app);
604
605 let task_ran = Rc::new(RefCell::new(false));
606 let task_ran_clone = Rc::clone(&task_ran);
607
608 foreground_executor
609 .spawn(async move {
610 *task_ran_clone.borrow_mut() = true;
611 })
612 .detach();
613
614 drop(app);
615
616 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
617
618 dispatcher.run_until_parked();
619
620 // The task should have been cancelled, not run
621 assert!(
622 !*task_ran.borrow(),
623 "Task should have been cancelled when app was dropped, but it ran!"
624 );
625 }
626
627 #[test]
628 fn test_nested_tasks_both_cancel() {
629 let (dispatcher, _background_executor, app) = create_test_app();
630 let foreground_executor = app.borrow().foreground_executor.clone();
631 let app_weak = Rc::downgrade(&app);
632
633 let outer_completed = Rc::new(RefCell::new(false));
634 let inner_completed = Rc::new(RefCell::new(false));
635 let reached_await = Rc::new(RefCell::new(false));
636
637 let outer_flag = Rc::clone(&outer_completed);
638 let inner_flag = Rc::clone(&inner_completed);
639 let await_flag = Rc::clone(&reached_await);
640
641 // Channel to block the inner task until we're ready
642 let (tx, rx) = futures::channel::oneshot::channel::<()>();
643
644 let inner_executor = foreground_executor.clone();
645
646 foreground_executor
647 .spawn(async move {
648 let inner_task = inner_executor.spawn({
649 let inner_flag = Rc::clone(&inner_flag);
650 async move {
651 rx.await.ok();
652 *inner_flag.borrow_mut() = true;
653 }
654 });
655
656 *await_flag.borrow_mut() = true;
657
658 inner_task.await;
659
660 *outer_flag.borrow_mut() = true;
661 })
662 .detach();
663
664 // Run dispatcher until outer task reaches the await point
665 // The inner task will be blocked on the channel
666 dispatcher.run_until_parked();
667
668 // Verify we actually reached the await point before dropping the app
669 assert!(
670 *reached_await.borrow(),
671 "Outer task should have reached the await point"
672 );
673
674 // Neither task should have completed yet
675 assert!(
676 !*outer_completed.borrow(),
677 "Outer task should not have completed yet"
678 );
679 assert!(
680 !*inner_completed.borrow(),
681 "Inner task should not have completed yet"
682 );
683
684 // Drop the channel sender and app while outer is awaiting inner
685 drop(tx);
686 drop(app);
687 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
688
689 // Run dispatcher - both tasks should be cancelled
690 dispatcher.run_until_parked();
691
692 // Neither task should have completed (both were cancelled)
693 assert!(
694 !*outer_completed.borrow(),
695 "Outer task should have been cancelled, not completed"
696 );
697 assert!(
698 !*inner_completed.borrow(),
699 "Inner task should have been cancelled, not completed"
700 );
701 }
702
703 #[test]
704 #[should_panic]
705 fn test_polling_cancelled_task_panics() {
706 let (dispatcher, _background_executor, app) = create_test_app();
707 let foreground_executor = app.borrow().foreground_executor.clone();
708 let app_weak = Rc::downgrade(&app);
709
710 let task = foreground_executor.spawn(async move { 42 });
711
712 drop(app);
713
714 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
715
716 dispatcher.run_until_parked();
717
718 foreground_executor.block_on(task);
719 }
720
721 #[test]
722 fn test_polling_cancelled_task_returns_none_with_fallible() {
723 let (dispatcher, _background_executor, app) = create_test_app();
724 let foreground_executor = app.borrow().foreground_executor.clone();
725 let app_weak = Rc::downgrade(&app);
726
727 let task = foreground_executor.spawn(async move { 42 }).fallible();
728
729 drop(app);
730
731 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
732
733 dispatcher.run_until_parked();
734
735 let result = foreground_executor.block_on(task);
736 assert_eq!(result, None, "Cancelled task should return None");
737 }
738}