executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<Runnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76}
 77
 78#[cfg(any(test, feature = "test-support"))]
 79struct ForegroundRunnable {
 80    runnable: Runnable,
 81    main: bool,
 82}
 83
 84#[cfg(any(test, feature = "test-support"))]
 85pub struct Deterministic {
 86    state: Arc<parking_lot::Mutex<DeterministicState>>,
 87    parker: parking_lot::Mutex<parking::Parker>,
 88}
 89
 90pub enum Timer {
 91    Production(smol::Timer),
 92    #[cfg(any(test, feature = "test-support"))]
 93    Deterministic(DeterministicTimer),
 94}
 95
 96#[cfg(any(test, feature = "test-support"))]
 97pub struct DeterministicTimer {
 98    rx: postage::barrier::Receiver,
 99    id: usize,
100    state: Arc<parking_lot::Mutex<DeterministicState>>,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104impl Deterministic {
105    pub fn new(seed: u64) -> Arc<Self> {
106        use rand::prelude::*;
107
108        Arc::new(Self {
109            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
110                rng: StdRng::seed_from_u64(seed),
111                seed,
112                scheduled_from_foreground: Default::default(),
113                scheduled_from_background: Default::default(),
114                forbid_parking: false,
115                block_on_ticks: 0..=1000,
116                now: std::time::Instant::now(),
117                next_timer_id: Default::default(),
118                pending_timers: Default::default(),
119                waiting_backtrace: None,
120            })),
121            parker: Default::default(),
122        })
123    }
124
125    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
126        Arc::new(Background::Deterministic {
127            executor: self.clone(),
128        })
129    }
130
131    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
132        Rc::new(Foreground::Deterministic {
133            cx_id: id,
134            executor: self.clone(),
135        })
136    }
137
138    fn spawn_from_foreground(
139        &self,
140        cx_id: usize,
141        future: AnyLocalFuture,
142        main: bool,
143    ) -> AnyLocalTask {
144        let state = self.state.clone();
145        let unparker = self.parker.lock().unparker();
146        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
147            let mut state = state.lock();
148            state
149                .scheduled_from_foreground
150                .entry(cx_id)
151                .or_default()
152                .push(ForegroundRunnable { runnable, main });
153            unparker.unpark();
154        });
155        runnable.schedule();
156        task
157    }
158
159    fn spawn(&self, future: AnyFuture) -> AnyTask {
160        let state = self.state.clone();
161        let unparker = self.parker.lock().unparker();
162        let (runnable, task) = async_task::spawn(future, move |runnable| {
163            let mut state = state.lock();
164            state.scheduled_from_background.push(runnable);
165            unparker.unpark();
166        });
167        runnable.schedule();
168        task
169    }
170
171    fn run<'a>(
172        &self,
173        cx_id: usize,
174        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
175    ) -> Box<dyn Any> {
176        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
177
178        let woken = Arc::new(AtomicBool::new(false));
179
180        let state = self.state.clone();
181        let unparker = self.parker.lock().unparker();
182        let (runnable, mut main_task) = unsafe {
183            async_task::spawn_unchecked(main_future, move |runnable| {
184                let mut state = state.lock();
185                state
186                    .scheduled_from_foreground
187                    .entry(cx_id)
188                    .or_default()
189                    .push(ForegroundRunnable {
190                        runnable,
191                        main: true,
192                    });
193                unparker.unpark();
194            })
195        };
196        runnable.schedule();
197
198        loop {
199            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
200                return result;
201            }
202
203            if !woken.load(SeqCst) {
204                self.state.lock().will_park();
205            }
206
207            woken.store(false, SeqCst);
208            self.parker.lock().park();
209        }
210    }
211
212    pub fn run_until_parked(&self) {
213        use std::sync::atomic::AtomicBool;
214        let woken = Arc::new(AtomicBool::new(false));
215        self.run_internal(woken, None);
216    }
217
218    fn run_internal(
219        &self,
220        woken: Arc<std::sync::atomic::AtomicBool>,
221        mut main_task: Option<&mut AnyLocalTask>,
222    ) -> Option<Box<dyn Any>> {
223        use rand::prelude::*;
224        use std::sync::atomic::Ordering::SeqCst;
225
226        let unparker = self.parker.lock().unparker();
227        let waker = waker_fn::waker_fn(move || {
228            woken.store(true, SeqCst);
229            unparker.unpark();
230        });
231
232        let mut cx = Context::from_waker(&waker);
233        loop {
234            let mut state = self.state.lock();
235
236            if state.scheduled_from_foreground.is_empty()
237                && state.scheduled_from_background.is_empty()
238            {
239                if let Some(main_task) = main_task {
240                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
241                        return Some(result);
242                    }
243                }
244
245                return None;
246            }
247
248            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
249                let background_len = state.scheduled_from_background.len();
250                let ix = state.rng.gen_range(0..background_len);
251                let runnable = state.scheduled_from_background.remove(ix);
252                drop(state);
253                runnable.run();
254            } else if !state.scheduled_from_foreground.is_empty() {
255                let available_cx_ids = state
256                    .scheduled_from_foreground
257                    .keys()
258                    .copied()
259                    .collect::<Vec<_>>();
260                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
261                let scheduled_from_cx = state
262                    .scheduled_from_foreground
263                    .get_mut(&cx_id_to_run)
264                    .unwrap();
265                let foreground_runnable = scheduled_from_cx.remove(0);
266                if scheduled_from_cx.is_empty() {
267                    state.scheduled_from_foreground.remove(&cx_id_to_run);
268                }
269
270                drop(state);
271
272                foreground_runnable.runnable.run();
273                if let Some(main_task) = main_task.as_mut() {
274                    if foreground_runnable.main {
275                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
276                            return Some(result);
277                        }
278                    }
279                }
280            }
281        }
282    }
283
284    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
285    where
286        F: Unpin + Future<Output = T>,
287    {
288        use rand::prelude::*;
289
290        let unparker = self.parker.lock().unparker();
291        let waker = waker_fn::waker_fn(move || {
292            unparker.unpark();
293        });
294
295        let mut cx = Context::from_waker(&waker);
296        for _ in 0..max_ticks {
297            let mut state = self.state.lock();
298            let runnable_count = state.scheduled_from_background.len();
299            let ix = state.rng.gen_range(0..=runnable_count);
300            if ix < state.scheduled_from_background.len() {
301                let runnable = state.scheduled_from_background.remove(ix);
302                drop(state);
303                runnable.run();
304            } else {
305                drop(state);
306                if let Poll::Ready(result) = future.poll(&mut cx) {
307                    return Some(result);
308                }
309                let mut state = self.state.lock();
310                if state.scheduled_from_background.is_empty() {
311                    state.will_park();
312                    drop(state);
313                    self.parker.lock().park();
314                }
315
316                continue;
317            }
318        }
319
320        None
321    }
322
323    pub fn timer(&self, duration: Duration) -> Timer {
324        let (tx, rx) = postage::barrier::channel();
325        let mut state = self.state.lock();
326        let wakeup_at = state.now + duration;
327        let id = util::post_inc(&mut state.next_timer_id);
328        state.pending_timers.push((id, wakeup_at, tx));
329        let state = self.state.clone();
330        Timer::Deterministic(DeterministicTimer { rx, id, state })
331    }
332
333    pub fn advance_clock(&self, duration: Duration) {
334        let new_now = self.state.lock().now + duration;
335        loop {
336            self.run_until_parked();
337            let mut state = self.state.lock();
338
339            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
340                let wakeup_time = *wakeup_time;
341                if wakeup_time <= new_now {
342                    let timer_count = state
343                        .pending_timers
344                        .iter()
345                        .take_while(|(_, t, _)| *t == wakeup_time)
346                        .count();
347                    state.now = wakeup_time;
348                    let timers_to_wake = state
349                        .pending_timers
350                        .drain(0..timer_count)
351                        .collect::<Vec<_>>();
352                    drop(state);
353                    drop(timers_to_wake);
354                    continue;
355                }
356            }
357
358            break;
359        }
360
361        self.state.lock().now = new_now;
362    }
363
364    pub fn forbid_parking(&self) {
365        use rand::prelude::*;
366
367        let mut state = self.state.lock();
368        state.forbid_parking = true;
369        state.rng = StdRng::seed_from_u64(state.seed);
370    }
371}
372
373impl Drop for Timer {
374    fn drop(&mut self) {
375        #[cfg(any(test, feature = "test-support"))]
376        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
377            state
378                .lock()
379                .pending_timers
380                .retain(|(timer_id, _, _)| timer_id != id)
381        }
382    }
383}
384
385impl Future for Timer {
386    type Output = ();
387
388    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389        match &mut *self {
390            #[cfg(any(test, feature = "test-support"))]
391            Self::Deterministic(DeterministicTimer { rx, .. }) => {
392                use postage::stream::{PollRecv, Stream as _};
393                smol::pin!(rx);
394                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
395                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
396                    PollRecv::Pending => Poll::Pending,
397                }
398            }
399            Self::Production(timer) => {
400                smol::pin!(timer);
401                match timer.poll(cx) {
402                    Poll::Ready(_) => Poll::Ready(()),
403                    Poll::Pending => Poll::Pending,
404                }
405            }
406        }
407    }
408}
409
410#[cfg(any(test, feature = "test-support"))]
411impl DeterministicState {
412    fn will_park(&mut self) {
413        if self.forbid_parking {
414            let mut backtrace_message = String::new();
415            #[cfg(any(test, feature = "test-support"))]
416            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
417                backtrace.resolve();
418                backtrace_message = format!(
419                    "\nbacktrace of waiting future:\n{:?}",
420                    util::CwdBacktrace(backtrace)
421                );
422            }
423
424            panic!(
425                "deterministic executor parked after a call to forbid_parking{}",
426                backtrace_message
427            );
428        }
429    }
430}
431
432impl Foreground {
433    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
434        if dispatcher.is_main_thread() {
435            Ok(Self::Platform {
436                dispatcher,
437                _not_send_or_sync: PhantomData,
438            })
439        } else {
440            Err(anyhow!("must be constructed on main thread"))
441        }
442    }
443
444    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
445        let future = any_local_future(future);
446        let any_task = match self {
447            #[cfg(any(test, feature = "test-support"))]
448            Self::Deterministic { cx_id, executor } => {
449                executor.spawn_from_foreground(*cx_id, future, false)
450            }
451            Self::Platform { dispatcher, .. } => {
452                fn spawn_inner(
453                    future: AnyLocalFuture,
454                    dispatcher: &Arc<dyn Dispatcher>,
455                ) -> AnyLocalTask {
456                    let dispatcher = dispatcher.clone();
457                    let schedule =
458                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
459                    let (runnable, task) = async_task::spawn_local(future, schedule);
460                    runnable.schedule();
461                    task
462                }
463                spawn_inner(future, dispatcher)
464            }
465        };
466        Task::local(any_task)
467    }
468
469    #[cfg(any(test, feature = "test-support"))]
470    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
471        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
472        let result = match self {
473            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
474            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
475        };
476        *result.downcast().unwrap()
477    }
478
479    #[cfg(any(test, feature = "test-support"))]
480    pub fn run_until_parked(&self) {
481        match self {
482            Self::Deterministic { executor, .. } => executor.run_until_parked(),
483            _ => panic!("this method can only be called on a deterministic executor"),
484        }
485    }
486
487    #[cfg(any(test, feature = "test-support"))]
488    pub fn parking_forbidden(&self) -> bool {
489        match self {
490            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
491            _ => panic!("this method can only be called on a deterministic executor"),
492        }
493    }
494
495    #[cfg(any(test, feature = "test-support"))]
496    pub fn start_waiting(&self) {
497        match self {
498            Self::Deterministic { executor, .. } => {
499                executor.state.lock().waiting_backtrace =
500                    Some(backtrace::Backtrace::new_unresolved());
501            }
502            _ => panic!("this method can only be called on a deterministic executor"),
503        }
504    }
505
506    #[cfg(any(test, feature = "test-support"))]
507    pub fn finish_waiting(&self) {
508        match self {
509            Self::Deterministic { executor, .. } => {
510                executor.state.lock().waiting_backtrace.take();
511            }
512            _ => panic!("this method can only be called on a deterministic executor"),
513        }
514    }
515
516    #[cfg(any(test, feature = "test-support"))]
517    pub fn forbid_parking(&self) {
518        match self {
519            Self::Deterministic { executor, .. } => executor.forbid_parking(),
520            _ => panic!("this method can only be called on a deterministic executor"),
521        }
522    }
523
524    #[cfg(any(test, feature = "test-support"))]
525    pub fn advance_clock(&self, duration: Duration) {
526        match self {
527            Self::Deterministic { executor, .. } => {
528                executor.run_until_parked();
529                executor.advance_clock(duration);
530            }
531            _ => panic!("this method can only be called on a deterministic executor"),
532        }
533    }
534
535    #[cfg(any(test, feature = "test-support"))]
536    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
537        match self {
538            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
539            _ => panic!("this method can only be called on a deterministic executor"),
540        }
541    }
542}
543
544impl Background {
545    pub fn new() -> Self {
546        let executor = Arc::new(Executor::new());
547        let stop = channel::unbounded::<()>();
548
549        for i in 0..2 * num_cpus::get() {
550            let executor = executor.clone();
551            let stop = stop.1.clone();
552            thread::Builder::new()
553                .name(format!("background-executor-{}", i))
554                .spawn(move || smol::block_on(executor.run(stop.recv())))
555                .unwrap();
556        }
557
558        Self::Production {
559            executor,
560            _stop: stop.0,
561        }
562    }
563
564    pub fn num_cpus(&self) -> usize {
565        num_cpus::get()
566    }
567
568    pub fn spawn<T, F>(&self, future: F) -> Task<T>
569    where
570        T: 'static + Send,
571        F: Send + Future<Output = T> + 'static,
572    {
573        let future = any_future(future);
574        let any_task = match self {
575            Self::Production { executor, .. } => executor.spawn(future),
576            #[cfg(any(test, feature = "test-support"))]
577            Self::Deterministic { executor } => executor.spawn(future),
578        };
579        Task::send(any_task)
580    }
581
582    pub fn block<F, T>(&self, future: F) -> T
583    where
584        F: Future<Output = T>,
585    {
586        smol::pin!(future);
587        match self {
588            Self::Production { .. } => smol::block_on(&mut future),
589            #[cfg(any(test, feature = "test-support"))]
590            Self::Deterministic { executor, .. } => {
591                executor.block(&mut future, usize::MAX).unwrap()
592            }
593        }
594    }
595
596    pub fn block_with_timeout<F, T>(
597        &self,
598        timeout: Duration,
599        future: F,
600    ) -> Result<T, impl Future<Output = T>>
601    where
602        T: 'static,
603        F: 'static + Unpin + Future<Output = T>,
604    {
605        let mut future = any_local_future(future);
606        if !timeout.is_zero() {
607            let output = match self {
608                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
609                #[cfg(any(test, feature = "test-support"))]
610                Self::Deterministic { executor, .. } => {
611                    use rand::prelude::*;
612                    let max_ticks = {
613                        let mut state = executor.state.lock();
614                        let range = state.block_on_ticks.clone();
615                        state.rng.gen_range(range)
616                    };
617                    executor.block(&mut future, max_ticks)
618                }
619            };
620            if let Some(output) = output {
621                return Ok(*output.downcast().unwrap());
622            }
623        }
624        Err(async { *future.await.downcast().unwrap() })
625    }
626
627    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
628    where
629        F: FnOnce(&mut Scope<'scope>),
630    {
631        let mut scope = Scope::new(self.clone());
632        (scheduler)(&mut scope);
633        let spawned = mem::take(&mut scope.futures)
634            .into_iter()
635            .map(|f| self.spawn(f))
636            .collect::<Vec<_>>();
637        for task in spawned {
638            task.await;
639        }
640    }
641
642    pub fn timer(&self, duration: Duration) -> Timer {
643        match self {
644            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
645            #[cfg(any(test, feature = "test-support"))]
646            Background::Deterministic { executor } => executor.timer(duration),
647        }
648    }
649
650    #[cfg(any(test, feature = "test-support"))]
651    pub async fn simulate_random_delay(&self) {
652        use rand::prelude::*;
653        use smol::future::yield_now;
654
655        match self {
656            Self::Deterministic { executor, .. } => {
657                if executor.state.lock().rng.gen_bool(0.2) {
658                    let yields = executor.state.lock().rng.gen_range(1..=10);
659                    for _ in 0..yields {
660                        yield_now().await;
661                    }
662                }
663            }
664            _ => {
665                log::info!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
666
667                // panic!("this method can only be called on a deterministic executor")
668            }
669        }
670    }
671}
672
673pub struct Scope<'a> {
674    executor: Arc<Background>,
675    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
676    tx: Option<mpsc::Sender<()>>,
677    rx: mpsc::Receiver<()>,
678    _phantom: PhantomData<&'a ()>,
679}
680
681impl<'a> Scope<'a> {
682    fn new(executor: Arc<Background>) -> Self {
683        let (tx, rx) = mpsc::channel(1);
684        Self {
685            executor,
686            tx: Some(tx),
687            rx,
688            futures: Default::default(),
689            _phantom: PhantomData,
690        }
691    }
692
693    pub fn spawn<F>(&mut self, f: F)
694    where
695        F: Future<Output = ()> + Send + 'a,
696    {
697        let tx = self.tx.clone().unwrap();
698
699        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
700        // dropping this `Scope` blocks until all of the futures have resolved.
701        let f = unsafe {
702            mem::transmute::<
703                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
704                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
705            >(Box::pin(async move {
706                f.await;
707                drop(tx);
708            }))
709        };
710        self.futures.push(f);
711    }
712}
713
714impl<'a> Drop for Scope<'a> {
715    fn drop(&mut self) {
716        self.tx.take().unwrap();
717
718        // Wait until the channel is closed, which means that all of the spawned
719        // futures have resolved.
720        self.executor.block(self.rx.next());
721    }
722}
723
724impl<T> Task<T> {
725    pub fn ready(value: T) -> Self {
726        Self::Ready(Some(value))
727    }
728
729    fn local(any_task: AnyLocalTask) -> Self {
730        Self::Local {
731            any_task,
732            result_type: PhantomData,
733        }
734    }
735
736    pub fn detach(self) {
737        match self {
738            Task::Ready(_) => {}
739            Task::Local { any_task, .. } => any_task.detach(),
740            Task::Send { any_task, .. } => any_task.detach(),
741        }
742    }
743}
744
745impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
746    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
747        cx.spawn(|_| async move {
748            if let Err(err) = self.await {
749                log::error!("{}", err);
750            }
751        })
752        .detach();
753    }
754}
755
756impl<T: Send> Task<T> {
757    fn send(any_task: AnyTask) -> Self {
758        Self::Send {
759            any_task,
760            result_type: PhantomData,
761        }
762    }
763}
764
765impl<T: fmt::Debug> fmt::Debug for Task<T> {
766    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
767        match self {
768            Task::Ready(value) => value.fmt(f),
769            Task::Local { any_task, .. } => any_task.fmt(f),
770            Task::Send { any_task, .. } => any_task.fmt(f),
771        }
772    }
773}
774
775impl<T: 'static> Future for Task<T> {
776    type Output = T;
777
778    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
779        match unsafe { self.get_unchecked_mut() } {
780            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
781            Task::Local { any_task, .. } => {
782                any_task.poll(cx).map(|value| *value.downcast().unwrap())
783            }
784            Task::Send { any_task, .. } => {
785                any_task.poll(cx).map(|value| *value.downcast().unwrap())
786            }
787        }
788    }
789}
790
791fn any_future<T, F>(future: F) -> AnyFuture
792where
793    T: 'static + Send,
794    F: Future<Output = T> + Send + 'static,
795{
796    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
797}
798
799fn any_local_future<T, F>(future: F) -> AnyLocalFuture
800where
801    T: 'static,
802    F: Future<Output = T> + 'static,
803{
804    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
805}