executor.rs

  1use crate::{App, PlatformDispatcher, PlatformScheduler};
  2use futures::channel::mpsc;
  3use scheduler::Scheduler;
  4use smol::prelude::*;
  5use std::{
  6    fmt::Debug,
  7    future::Future,
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    time::{Duration, Instant},
 14};
 15use util::TryFutureExt;
 16
 17pub use scheduler::{FallibleTask, Priority};
 18
 19/// A pointer to the executor that is currently running,
 20/// for spawning background tasks.
 21#[derive(Clone)]
 22pub struct BackgroundExecutor {
 23    inner: scheduler::BackgroundExecutor,
 24    dispatcher: Arc<dyn PlatformDispatcher>,
 25}
 26
 27/// A pointer to the executor that is currently running,
 28/// for spawning tasks on the main thread.
 29#[derive(Clone)]
 30pub struct ForegroundExecutor {
 31    inner: scheduler::ForegroundExecutor,
 32    dispatcher: Arc<dyn PlatformDispatcher>,
 33    not_send: PhantomData<Rc<()>>,
 34}
 35
 36/// Task is a primitive that allows work to happen in the background.
 37///
 38/// It implements [`Future`] so you can `.await` on it.
 39///
 40/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 41/// the task to continue running, but with no way to return a value.
 42#[must_use]
 43#[derive(Debug)]
 44pub struct Task<T>(scheduler::Task<T>);
 45
 46impl<T> Task<T> {
 47    /// Creates a new task that will resolve with the value.
 48    pub fn ready(val: T) -> Self {
 49        Task(scheduler::Task::ready(val))
 50    }
 51
 52    /// Returns true if the task has completed or was created with `Task::ready`.
 53    pub fn is_ready(&self) -> bool {
 54        self.0.is_ready()
 55    }
 56
 57    /// Detaching a task runs it to completion in the background.
 58    pub fn detach(self) {
 59        self.0.detach()
 60    }
 61
 62    /// Wraps a scheduler::Task.
 63    pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
 64        Task(task)
 65    }
 66
 67    /// Converts this task into a fallible task that returns `Option<T>`.
 68    ///
 69    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 70    /// if the task was cancelled.
 71    ///
 72    /// # Example
 73    ///
 74    /// ```ignore
 75    /// // Background task that gracefully handles cancellation:
 76    /// cx.background_spawn(async move {
 77    ///     let result = foreground_task.fallible().await;
 78    ///     if let Some(value) = result {
 79    ///         // Process the value
 80    ///     }
 81    ///     // If None, task was cancelled - just exit gracefully
 82    /// }).detach();
 83    /// ```
 84    pub fn fallible(self) -> FallibleTask<T> {
 85        self.0.fallible()
 86    }
 87}
 88
 89impl<T, E> Task<Result<T, E>>
 90where
 91    T: 'static,
 92    E: 'static + Debug,
 93{
 94    /// Run the task to completion in the background and log any errors that occur.
 95    #[track_caller]
 96    pub fn detach_and_log_err(self, cx: &App) {
 97        let location = core::panic::Location::caller();
 98        cx.foreground_executor()
 99            .spawn(self.log_tracked_err(*location))
100            .detach();
101    }
102}
103
104impl<T> std::future::Future for Task<T> {
105    type Output = T;
106
107    fn poll(
108        self: std::pin::Pin<&mut Self>,
109        cx: &mut std::task::Context<'_>,
110    ) -> std::task::Poll<Self::Output> {
111        // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
112        // and we're just projecting the pin through to the inner task.
113        let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
114        inner.poll(cx)
115    }
116}
117
118impl BackgroundExecutor {
119    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
120    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
121        #[cfg(any(test, feature = "test-support"))]
122        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
123            test_dispatcher.scheduler().clone()
124        } else {
125            Arc::new(PlatformScheduler::new(dispatcher.clone()))
126        };
127
128        #[cfg(not(any(test, feature = "test-support")))]
129        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
130
131        Self {
132            inner: scheduler::BackgroundExecutor::new(scheduler),
133            dispatcher,
134        }
135    }
136
137    /// Close this executor. Tasks will not run after this is called.
138    pub fn close(&self) {
139        self.inner.close();
140    }
141
142    /// Enqueues the given future to be run to completion on a background thread.
143    #[track_caller]
144    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
145    where
146        R: Send + 'static,
147    {
148        self.spawn_with_priority(Priority::default(), future)
149    }
150
151    /// Enqueues the given future to be run to completion on a background thread with the given priority.
152    ///
153    /// `Priority::Realtime` is currently treated as `Priority::High`.
154    ///
155    /// This is intentionally *not* a "downgrade" feature: realtime execution is effectively
156    /// disabled until we have an in-tree use case and are confident about the semantics and
157    /// failure modes (especially around channel backpressure and the risk of blocking
158    /// latency-sensitive threads). It should be straightforward to add a true realtime
159    /// implementation back once those constraints are well-defined.
160    #[track_caller]
161    pub fn spawn_with_priority<R>(
162        &self,
163        priority: Priority,
164        future: impl Future<Output = R> + Send + 'static,
165    ) -> Task<R>
166    where
167        R: Send + 'static,
168    {
169        Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
170    }
171
172    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
173    ///
174    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
175    /// completion before the current task is resumed, even if the current task is slated for cancellation.
176    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
177    where
178        R: Send,
179    {
180        use crate::RunnableMeta;
181        use parking_lot::{Condvar, Mutex};
182        use std::sync::{Arc, atomic::AtomicBool};
183
184        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
185
186        impl Drop for NotifyOnDrop<'_> {
187            fn drop(&mut self) {
188                *self.0.1.lock() = true;
189                self.0.0.notify_all();
190            }
191        }
192
193        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
194
195        impl Drop for WaitOnDrop<'_> {
196            fn drop(&mut self) {
197                let mut done = self.0.1.lock();
198                if !*done {
199                    self.0.0.wait(&mut done);
200                }
201            }
202        }
203
204        let dispatcher = self.dispatcher.clone();
205        let location = core::panic::Location::caller();
206        let closed = Arc::new(AtomicBool::new(false));
207
208        let pair = &(Condvar::new(), Mutex::new(false));
209        let _wait_guard = WaitOnDrop(pair);
210
211        let (runnable, task) = unsafe {
212            async_task::Builder::new()
213                .metadata(RunnableMeta { location, closed })
214                .spawn_unchecked(
215                    move |_| async {
216                        let _notify_guard = NotifyOnDrop(pair);
217                        future.await
218                    },
219                    move |runnable| {
220                        dispatcher.dispatch(runnable, Priority::default());
221                    },
222                )
223        };
224        runnable.schedule();
225        task.await
226    }
227
228    /// Scoped lets you start a number of tasks and waits
229    /// for all of them to complete before returning.
230    pub async fn scoped<'scope, F>(&self, scheduler: F)
231    where
232        F: FnOnce(&mut Scope<'scope>),
233    {
234        let mut scope = Scope::new(self.clone(), Priority::default());
235        (scheduler)(&mut scope);
236        let spawned = mem::take(&mut scope.futures)
237            .into_iter()
238            .map(|f| self.spawn_with_priority(scope.priority, f))
239            .collect::<Vec<_>>();
240        for task in spawned {
241            task.await;
242        }
243    }
244
245    /// Scoped lets you start a number of tasks and waits
246    /// for all of them to complete before returning.
247    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
248    where
249        F: FnOnce(&mut Scope<'scope>),
250    {
251        let mut scope = Scope::new(self.clone(), priority);
252        (scheduler)(&mut scope);
253        let spawned = mem::take(&mut scope.futures)
254            .into_iter()
255            .map(|f| self.spawn_with_priority(scope.priority, f))
256            .collect::<Vec<_>>();
257        for task in spawned {
258            task.await;
259        }
260    }
261
262    /// Get the current time.
263    ///
264    /// Calling this instead of `std::time::Instant::now` allows the use
265    /// of fake timers in tests.
266    pub fn now(&self) -> Instant {
267        self.inner.scheduler().clock().now()
268    }
269
270    /// Returns a task that will complete after the given duration.
271    /// Depending on other concurrent tasks the elapsed duration may be longer
272    /// than requested.
273    pub fn timer(&self, duration: Duration) -> Task<()> {
274        if duration.is_zero() {
275            return Task::ready(());
276        }
277        self.spawn(self.inner.scheduler().timer(duration))
278    }
279
280    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
281    #[cfg(any(test, feature = "test-support"))]
282    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
283        self.dispatcher.as_test().unwrap().simulate_random_delay()
284    }
285
286    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
287    #[cfg(any(test, feature = "test-support"))]
288    pub fn advance_clock(&self, duration: Duration) {
289        self.dispatcher.as_test().unwrap().advance_clock(duration)
290    }
291
292    /// In tests, run one task.
293    #[cfg(any(test, feature = "test-support"))]
294    pub fn tick(&self) -> bool {
295        self.dispatcher.as_test().unwrap().scheduler().tick()
296    }
297
298    /// In tests, run tasks until the scheduler would park.
299    ///
300    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
301    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
302    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
303    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
304    #[cfg(any(test, feature = "test-support"))]
305    pub fn run_until_parked(&self) {
306        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
307        scheduler.run();
308    }
309
310    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
311    #[cfg(any(test, feature = "test-support"))]
312    pub fn allow_parking(&self) {
313        self.dispatcher
314            .as_test()
315            .unwrap()
316            .scheduler()
317            .allow_parking();
318
319        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
320            log::warn!("[gpui::executor] allow_parking: enabled");
321        }
322    }
323
324    /// Sets the range of ticks to run before timing out in block_on.
325    #[cfg(any(test, feature = "test-support"))]
326    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
327        self.dispatcher
328            .as_test()
329            .unwrap()
330            .scheduler()
331            .set_timeout_ticks(range);
332    }
333
334    /// Undoes the effect of [`Self::allow_parking`].
335    #[cfg(any(test, feature = "test-support"))]
336    pub fn forbid_parking(&self) {
337        self.dispatcher
338            .as_test()
339            .unwrap()
340            .scheduler()
341            .forbid_parking();
342    }
343
344    /// In tests, returns the rng used by the dispatcher.
345    #[cfg(any(test, feature = "test-support"))]
346    pub fn rng(&self) -> scheduler::SharedRng {
347        self.dispatcher.as_test().unwrap().scheduler().rng()
348    }
349
350    /// How many CPUs are available to the dispatcher.
351    pub fn num_cpus(&self) -> usize {
352        #[cfg(any(test, feature = "test-support"))]
353        if self.dispatcher.as_test().is_some() {
354            return 4;
355        }
356        num_cpus::get()
357    }
358
359    /// Whether we're on the main thread.
360    pub fn is_main_thread(&self) -> bool {
361        self.dispatcher.is_main_thread()
362    }
363
364    #[doc(hidden)]
365    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
366        &self.dispatcher
367    }
368}
369
370impl ForegroundExecutor {
371    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
372    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
373        #[cfg(any(test, feature = "test-support"))]
374        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
375            if let Some(test_dispatcher) = dispatcher.as_test() {
376                (
377                    test_dispatcher.scheduler().clone(),
378                    test_dispatcher.session_id(),
379                )
380            } else {
381                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
382                let session_id = platform_scheduler.allocate_session_id();
383                (platform_scheduler, session_id)
384            };
385
386        #[cfg(not(any(test, feature = "test-support")))]
387        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
388            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
389            let session_id = platform_scheduler.allocate_session_id();
390            (platform_scheduler, session_id)
391        };
392
393        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
394
395        Self {
396            inner,
397            dispatcher,
398            not_send: PhantomData,
399        }
400    }
401
402    /// Close this executor. Tasks will not run after this is called.
403    pub fn close(&self) {
404        self.inner.close();
405    }
406
407    /// Enqueues the given Task to run on the main thread.
408    #[track_caller]
409    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
410    where
411        R: 'static,
412    {
413        Task::from_scheduler(self.inner.spawn(future))
414    }
415
416    /// Enqueues the given Task to run on the main thread with the given priority.
417    #[track_caller]
418    pub fn spawn_with_priority<R>(
419        &self,
420        _priority: Priority,
421        future: impl Future<Output = R> + 'static,
422    ) -> Task<R>
423    where
424        R: 'static,
425    {
426        // Priority is ignored for foreground tasks - they run in order on the main thread
427        Task::from_scheduler(self.inner.spawn(future))
428    }
429
430    /// Used by the test harness to run an async test in a synchronous fashion.
431    #[cfg(any(test, feature = "test-support"))]
432    #[track_caller]
433    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
434        use std::cell::Cell;
435
436        let scheduler = self.inner.scheduler();
437
438        let output = Cell::new(None);
439        let future = async {
440            output.set(Some(future.await));
441        };
442        let mut future = std::pin::pin!(future);
443
444        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
445        // (which are associated with the test session) to make progress while we block.
446        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
447        scheduler.block(None, future.as_mut(), None);
448
449        output.take().expect("block_test future did not complete")
450    }
451
452    /// Block the current thread until the given future resolves.
453    /// Consider using `block_with_timeout` instead.
454    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
455        self.inner.block_on(future)
456    }
457
458    /// Block the current thread until the given future resolves or the timeout elapses.
459    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
460        &self,
461        duration: Duration,
462        future: Fut,
463    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
464        self.inner.block_with_timeout(duration, future)
465    }
466
467    #[doc(hidden)]
468    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
469        &self.dispatcher
470    }
471}
472
473/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
474pub struct Scope<'a> {
475    executor: BackgroundExecutor,
476    priority: Priority,
477    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
478    tx: Option<mpsc::Sender<()>>,
479    rx: mpsc::Receiver<()>,
480    lifetime: PhantomData<&'a ()>,
481}
482
483impl<'a> Scope<'a> {
484    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
485        let (tx, rx) = mpsc::channel(1);
486        Self {
487            executor,
488            priority,
489            tx: Some(tx),
490            rx,
491            futures: Default::default(),
492            lifetime: PhantomData,
493        }
494    }
495
496    /// How many CPUs are available to the dispatcher.
497    pub fn num_cpus(&self) -> usize {
498        self.executor.num_cpus()
499    }
500
501    /// Spawn a future into this scope.
502    #[track_caller]
503    pub fn spawn<F>(&mut self, f: F)
504    where
505        F: Future<Output = ()> + Send + 'a,
506    {
507        let tx = self.tx.clone().unwrap();
508
509        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
510        // dropping this `Scope` blocks until all of the futures have resolved.
511        let f = unsafe {
512            mem::transmute::<
513                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
514                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
515            >(Box::pin(async move {
516                f.await;
517                drop(tx);
518            }))
519        };
520        self.futures.push(f);
521    }
522}
523
524impl Drop for Scope<'_> {
525    fn drop(&mut self) {
526        self.tx.take().unwrap();
527
528        // Wait until the channel is closed, which means that all of the spawned
529        // futures have resolved.
530        let future = async {
531            self.rx.next().await;
532        };
533        let mut future = std::pin::pin!(future);
534        self.executor
535            .inner
536            .scheduler()
537            .block(None, future.as_mut(), None);
538    }
539}
540
541#[cfg(test)]
542mod test {
543    use super::*;
544    use crate::{App, TestDispatcher, TestPlatform};
545    use std::cell::RefCell;
546
547    /// Helper to create test infrastructure.
548    /// Returns (dispatcher, background_executor, app).
549    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
550        let dispatcher = TestDispatcher::new(0);
551        let arc_dispatcher = Arc::new(dispatcher.clone());
552        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
553        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
554
555        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
556        let asset_source = Arc::new(());
557        let http_client = http_client::FakeHttpClient::with_404_response();
558
559        let app = App::new_app(platform, asset_source, http_client);
560        (dispatcher, background_executor, app)
561    }
562
563    #[test]
564    fn sanity_test_tasks_run() {
565        let (dispatcher, _background_executor, app) = create_test_app();
566        let foreground_executor = app.borrow().foreground_executor.clone();
567
568        let task_ran = Rc::new(RefCell::new(false));
569
570        foreground_executor
571            .spawn({
572                let task_ran = Rc::clone(&task_ran);
573                async move {
574                    *task_ran.borrow_mut() = true;
575                }
576            })
577            .detach();
578
579        // Run dispatcher while app is still alive
580        dispatcher.run_until_parked();
581
582        // Task should have run
583        assert!(
584            *task_ran.borrow(),
585            "Task should run normally when app is alive"
586        );
587    }
588
589    #[test]
590    fn test_task_cancelled_when_app_dropped() {
591        let (dispatcher, _background_executor, app) = create_test_app();
592        let foreground_executor = app.borrow().foreground_executor.clone();
593        let app_weak = Rc::downgrade(&app);
594
595        let task_ran = Rc::new(RefCell::new(false));
596        let task_ran_clone = Rc::clone(&task_ran);
597
598        foreground_executor
599            .spawn(async move {
600                *task_ran_clone.borrow_mut() = true;
601            })
602            .detach();
603
604        drop(app);
605
606        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
607
608        dispatcher.run_until_parked();
609
610        // The task should have been cancelled, not run
611        assert!(
612            !*task_ran.borrow(),
613            "Task should have been cancelled when app was dropped, but it ran!"
614        );
615    }
616
617    #[test]
618    fn test_nested_tasks_both_cancel() {
619        let (dispatcher, _background_executor, app) = create_test_app();
620        let foreground_executor = app.borrow().foreground_executor.clone();
621        let app_weak = Rc::downgrade(&app);
622
623        let outer_completed = Rc::new(RefCell::new(false));
624        let inner_completed = Rc::new(RefCell::new(false));
625        let reached_await = Rc::new(RefCell::new(false));
626
627        let outer_flag = Rc::clone(&outer_completed);
628        let inner_flag = Rc::clone(&inner_completed);
629        let await_flag = Rc::clone(&reached_await);
630
631        // Channel to block the inner task until we're ready
632        let (tx, rx) = futures::channel::oneshot::channel::<()>();
633
634        let inner_executor = foreground_executor.clone();
635
636        foreground_executor
637            .spawn(async move {
638                let inner_task = inner_executor.spawn({
639                    let inner_flag = Rc::clone(&inner_flag);
640                    async move {
641                        rx.await.ok();
642                        *inner_flag.borrow_mut() = true;
643                    }
644                });
645
646                *await_flag.borrow_mut() = true;
647
648                inner_task.await;
649
650                *outer_flag.borrow_mut() = true;
651            })
652            .detach();
653
654        // Run dispatcher until outer task reaches the await point
655        // The inner task will be blocked on the channel
656        dispatcher.run_until_parked();
657
658        // Verify we actually reached the await point before dropping the app
659        assert!(
660            *reached_await.borrow(),
661            "Outer task should have reached the await point"
662        );
663
664        // Neither task should have completed yet
665        assert!(
666            !*outer_completed.borrow(),
667            "Outer task should not have completed yet"
668        );
669        assert!(
670            !*inner_completed.borrow(),
671            "Inner task should not have completed yet"
672        );
673
674        // Drop the channel sender and app while outer is awaiting inner
675        drop(tx);
676        drop(app);
677        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
678
679        // Run dispatcher - both tasks should be cancelled
680        dispatcher.run_until_parked();
681
682        // Neither task should have completed (both were cancelled)
683        assert!(
684            !*outer_completed.borrow(),
685            "Outer task should have been cancelled, not completed"
686        );
687        assert!(
688            !*inner_completed.borrow(),
689            "Inner task should have been cancelled, not completed"
690        );
691    }
692
693    #[test]
694    #[should_panic]
695    fn test_polling_cancelled_task_panics() {
696        let (dispatcher, _background_executor, app) = create_test_app();
697        let foreground_executor = app.borrow().foreground_executor.clone();
698        let app_weak = Rc::downgrade(&app);
699
700        let task = foreground_executor.spawn(async move { 42 });
701
702        drop(app);
703
704        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
705
706        dispatcher.run_until_parked();
707
708        foreground_executor.block_on(task);
709    }
710
711    #[test]
712    fn test_polling_cancelled_task_returns_none_with_fallible() {
713        let (dispatcher, _background_executor, app) = create_test_app();
714        let foreground_executor = app.borrow().foreground_executor.clone();
715        let app_weak = Rc::downgrade(&app);
716
717        let task = foreground_executor.spawn(async move { 42 }).fallible();
718
719        drop(app);
720
721        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
722
723        dispatcher.run_until_parked();
724
725        let result = foreground_executor.block_on(task);
726        assert_eq!(result, None, "Cancelled task should return None");
727    }
728}