executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<Runnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76}
 77
 78#[cfg(any(test, feature = "test-support"))]
 79struct ForegroundRunnable {
 80    runnable: Runnable,
 81    main: bool,
 82}
 83
 84#[cfg(any(test, feature = "test-support"))]
 85pub struct Deterministic {
 86    state: Arc<parking_lot::Mutex<DeterministicState>>,
 87    parker: parking_lot::Mutex<parking::Parker>,
 88}
 89
 90pub enum Timer {
 91    Production(smol::Timer),
 92    #[cfg(any(test, feature = "test-support"))]
 93    Deterministic(DeterministicTimer),
 94}
 95
 96#[cfg(any(test, feature = "test-support"))]
 97pub struct DeterministicTimer {
 98    rx: postage::barrier::Receiver,
 99    id: usize,
100    state: Arc<parking_lot::Mutex<DeterministicState>>,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104impl Deterministic {
105    pub fn new(seed: u64) -> Arc<Self> {
106        use rand::prelude::*;
107
108        Arc::new(Self {
109            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
110                rng: StdRng::seed_from_u64(seed),
111                seed,
112                scheduled_from_foreground: Default::default(),
113                scheduled_from_background: Default::default(),
114                forbid_parking: false,
115                block_on_ticks: 0..=1000,
116                now: std::time::Instant::now(),
117                next_timer_id: Default::default(),
118                pending_timers: Default::default(),
119                waiting_backtrace: None,
120            })),
121            parker: Default::default(),
122        })
123    }
124
125    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
126        Arc::new(Background::Deterministic {
127            executor: self.clone(),
128        })
129    }
130
131    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
132        Rc::new(Foreground::Deterministic {
133            cx_id: id,
134            executor: self.clone(),
135        })
136    }
137
138    fn spawn_from_foreground(
139        &self,
140        cx_id: usize,
141        future: AnyLocalFuture,
142        main: bool,
143    ) -> AnyLocalTask {
144        let state = self.state.clone();
145        let unparker = self.parker.lock().unparker();
146        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
147            let mut state = state.lock();
148            state
149                .scheduled_from_foreground
150                .entry(cx_id)
151                .or_default()
152                .push(ForegroundRunnable { runnable, main });
153            unparker.unpark();
154        });
155        runnable.schedule();
156        task
157    }
158
159    fn spawn(&self, future: AnyFuture) -> AnyTask {
160        let state = self.state.clone();
161        let unparker = self.parker.lock().unparker();
162        let (runnable, task) = async_task::spawn(future, move |runnable| {
163            let mut state = state.lock();
164            state.scheduled_from_background.push(runnable);
165            unparker.unpark();
166        });
167        runnable.schedule();
168        task
169    }
170
171    fn run<'a>(
172        &self,
173        cx_id: usize,
174        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
175    ) -> Box<dyn Any> {
176        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
177
178        let woken = Arc::new(AtomicBool::new(false));
179
180        let state = self.state.clone();
181        let unparker = self.parker.lock().unparker();
182        let (runnable, mut main_task) = unsafe {
183            async_task::spawn_unchecked(main_future, move |runnable| {
184                let mut state = state.lock();
185                state
186                    .scheduled_from_foreground
187                    .entry(cx_id)
188                    .or_default()
189                    .push(ForegroundRunnable {
190                        runnable,
191                        main: true,
192                    });
193                unparker.unpark();
194            })
195        };
196        runnable.schedule();
197
198        loop {
199            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
200                return result;
201            }
202
203            if !woken.load(SeqCst) {
204                self.state.lock().will_park();
205            }
206
207            woken.store(false, SeqCst);
208            self.parker.lock().park();
209        }
210    }
211
212    pub fn run_until_parked(&self) {
213        use std::sync::atomic::AtomicBool;
214        let woken = Arc::new(AtomicBool::new(false));
215        self.run_internal(woken, None);
216    }
217
218    fn run_internal(
219        &self,
220        woken: Arc<std::sync::atomic::AtomicBool>,
221        mut main_task: Option<&mut AnyLocalTask>,
222    ) -> Option<Box<dyn Any>> {
223        use rand::prelude::*;
224        use std::sync::atomic::Ordering::SeqCst;
225
226        let unparker = self.parker.lock().unparker();
227        let waker = waker_fn::waker_fn(move || {
228            woken.store(true, SeqCst);
229            unparker.unpark();
230        });
231
232        let mut cx = Context::from_waker(&waker);
233        loop {
234            let mut state = self.state.lock();
235
236            if state.scheduled_from_foreground.is_empty()
237                && state.scheduled_from_background.is_empty()
238            {
239                if let Some(main_task) = main_task {
240                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
241                        return Some(result);
242                    }
243                }
244
245                return None;
246            }
247
248            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
249                let background_len = state.scheduled_from_background.len();
250                let ix = state.rng.gen_range(0..background_len);
251                let runnable = state.scheduled_from_background.remove(ix);
252                drop(state);
253                runnable.run();
254            } else if !state.scheduled_from_foreground.is_empty() {
255                let available_cx_ids = state
256                    .scheduled_from_foreground
257                    .keys()
258                    .copied()
259                    .collect::<Vec<_>>();
260                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
261                let scheduled_from_cx = state
262                    .scheduled_from_foreground
263                    .get_mut(&cx_id_to_run)
264                    .unwrap();
265                let foreground_runnable = scheduled_from_cx.remove(0);
266                if scheduled_from_cx.is_empty() {
267                    state.scheduled_from_foreground.remove(&cx_id_to_run);
268                }
269
270                drop(state);
271
272                foreground_runnable.runnable.run();
273                if let Some(main_task) = main_task.as_mut() {
274                    if foreground_runnable.main {
275                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
276                            return Some(result);
277                        }
278                    }
279                }
280            }
281        }
282    }
283
284    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
285    where
286        F: Unpin + Future<Output = T>,
287    {
288        use rand::prelude::*;
289
290        let unparker = self.parker.lock().unparker();
291        let waker = waker_fn::waker_fn(move || {
292            unparker.unpark();
293        });
294
295        let mut cx = Context::from_waker(&waker);
296        for _ in 0..max_ticks {
297            let mut state = self.state.lock();
298            let runnable_count = state.scheduled_from_background.len();
299            let ix = state.rng.gen_range(0..=runnable_count);
300            if ix < state.scheduled_from_background.len() {
301                let runnable = state.scheduled_from_background.remove(ix);
302                drop(state);
303                runnable.run();
304            } else {
305                drop(state);
306                if let Poll::Ready(result) = future.poll(&mut cx) {
307                    return Some(result);
308                }
309                let mut state = self.state.lock();
310                if state.scheduled_from_background.is_empty() {
311                    state.will_park();
312                    drop(state);
313                    self.parker.lock().park();
314                }
315
316                continue;
317            }
318        }
319
320        None
321    }
322
323    pub fn timer(&self, duration: Duration) -> Timer {
324        let (tx, rx) = postage::barrier::channel();
325        let mut state = self.state.lock();
326        let wakeup_at = state.now + duration;
327        let id = util::post_inc(&mut state.next_timer_id);
328        state.pending_timers.push((id, wakeup_at, tx));
329        let state = self.state.clone();
330        Timer::Deterministic(DeterministicTimer { rx, id, state })
331    }
332
333    pub fn advance_clock(&self, duration: Duration) {
334        let new_now = self.state.lock().now + duration;
335        loop {
336            self.run_until_parked();
337            let mut state = self.state.lock();
338
339            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
340                let wakeup_time = *wakeup_time;
341                if wakeup_time <= new_now {
342                    let timer_count = state
343                        .pending_timers
344                        .iter()
345                        .take_while(|(_, t, _)| *t == wakeup_time)
346                        .count();
347                    state.now = wakeup_time;
348                    let timers_to_wake = state
349                        .pending_timers
350                        .drain(0..timer_count)
351                        .collect::<Vec<_>>();
352                    drop(state);
353                    drop(timers_to_wake);
354                    continue;
355                }
356            }
357
358            break;
359        }
360
361        self.state.lock().now = new_now;
362    }
363}
364
365impl Drop for Timer {
366    fn drop(&mut self) {
367        #[cfg(any(test, feature = "test-support"))]
368        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
369            state
370                .lock()
371                .pending_timers
372                .retain(|(timer_id, _, _)| timer_id != id)
373        }
374    }
375}
376
377impl Future for Timer {
378    type Output = ();
379
380    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
381        match &mut *self {
382            #[cfg(any(test, feature = "test-support"))]
383            Self::Deterministic(DeterministicTimer { rx, .. }) => {
384                use postage::stream::{PollRecv, Stream as _};
385                smol::pin!(rx);
386                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
387                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
388                    PollRecv::Pending => Poll::Pending,
389                }
390            }
391            Self::Production(timer) => {
392                smol::pin!(timer);
393                match timer.poll(cx) {
394                    Poll::Ready(_) => Poll::Ready(()),
395                    Poll::Pending => Poll::Pending,
396                }
397            }
398        }
399    }
400}
401
402#[cfg(any(test, feature = "test-support"))]
403impl DeterministicState {
404    fn will_park(&mut self) {
405        if self.forbid_parking {
406            let mut backtrace_message = String::new();
407            #[cfg(any(test, feature = "test-support"))]
408            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
409                backtrace.resolve();
410                backtrace_message = format!(
411                    "\nbacktrace of waiting future:\n{:?}",
412                    util::CwdBacktrace(backtrace)
413                );
414            }
415
416            panic!(
417                "deterministic executor parked after a call to forbid_parking{}",
418                backtrace_message
419            );
420        }
421    }
422}
423
424impl Foreground {
425    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
426        if dispatcher.is_main_thread() {
427            Ok(Self::Platform {
428                dispatcher,
429                _not_send_or_sync: PhantomData,
430            })
431        } else {
432            Err(anyhow!("must be constructed on main thread"))
433        }
434    }
435
436    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
437        let future = any_local_future(future);
438        let any_task = match self {
439            #[cfg(any(test, feature = "test-support"))]
440            Self::Deterministic { cx_id, executor } => {
441                executor.spawn_from_foreground(*cx_id, future, false)
442            }
443            Self::Platform { dispatcher, .. } => {
444                fn spawn_inner(
445                    future: AnyLocalFuture,
446                    dispatcher: &Arc<dyn Dispatcher>,
447                ) -> AnyLocalTask {
448                    let dispatcher = dispatcher.clone();
449                    let schedule =
450                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
451                    let (runnable, task) = async_task::spawn_local(future, schedule);
452                    runnable.schedule();
453                    task
454                }
455                spawn_inner(future, dispatcher)
456            }
457        };
458        Task::local(any_task)
459    }
460
461    #[cfg(any(test, feature = "test-support"))]
462    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
463        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
464        let result = match self {
465            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
466            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
467        };
468        *result.downcast().unwrap()
469    }
470
471    #[cfg(any(test, feature = "test-support"))]
472    pub fn run_until_parked(&self) {
473        match self {
474            Self::Deterministic { executor, .. } => executor.run_until_parked(),
475            _ => panic!("this method can only be called on a deterministic executor"),
476        }
477    }
478
479    #[cfg(any(test, feature = "test-support"))]
480    pub fn parking_forbidden(&self) -> bool {
481        match self {
482            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
483            _ => panic!("this method can only be called on a deterministic executor"),
484        }
485    }
486
487    #[cfg(any(test, feature = "test-support"))]
488    pub fn start_waiting(&self) {
489        match self {
490            Self::Deterministic { executor, .. } => {
491                executor.state.lock().waiting_backtrace =
492                    Some(backtrace::Backtrace::new_unresolved());
493            }
494            _ => panic!("this method can only be called on a deterministic executor"),
495        }
496    }
497
498    #[cfg(any(test, feature = "test-support"))]
499    pub fn finish_waiting(&self) {
500        match self {
501            Self::Deterministic { executor, .. } => {
502                executor.state.lock().waiting_backtrace.take();
503            }
504            _ => panic!("this method can only be called on a deterministic executor"),
505        }
506    }
507
508    #[cfg(any(test, feature = "test-support"))]
509    pub fn forbid_parking(&self) {
510        use rand::prelude::*;
511
512        match self {
513            Self::Deterministic { executor, .. } => {
514                let mut state = executor.state.lock();
515                state.forbid_parking = true;
516                state.rng = StdRng::seed_from_u64(state.seed);
517            }
518            _ => panic!("this method can only be called on a deterministic executor"),
519        }
520    }
521
522    #[cfg(any(test, feature = "test-support"))]
523    pub fn advance_clock(&self, duration: Duration) {
524        match self {
525            Self::Deterministic { executor, .. } => {
526                executor.run_until_parked();
527                executor.advance_clock(duration);
528            }
529            _ => panic!("this method can only be called on a deterministic executor"),
530        }
531    }
532
533    #[cfg(any(test, feature = "test-support"))]
534    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
535        match self {
536            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
537            _ => panic!("this method can only be called on a deterministic executor"),
538        }
539    }
540}
541
542impl Background {
543    pub fn new() -> Self {
544        let executor = Arc::new(Executor::new());
545        let stop = channel::unbounded::<()>();
546
547        for i in 0..2 * num_cpus::get() {
548            let executor = executor.clone();
549            let stop = stop.1.clone();
550            thread::Builder::new()
551                .name(format!("background-executor-{}", i))
552                .spawn(move || smol::block_on(executor.run(stop.recv())))
553                .unwrap();
554        }
555
556        Self::Production {
557            executor,
558            _stop: stop.0,
559        }
560    }
561
562    pub fn num_cpus(&self) -> usize {
563        num_cpus::get()
564    }
565
566    pub fn spawn<T, F>(&self, future: F) -> Task<T>
567    where
568        T: 'static + Send,
569        F: Send + Future<Output = T> + 'static,
570    {
571        let future = any_future(future);
572        let any_task = match self {
573            Self::Production { executor, .. } => executor.spawn(future),
574            #[cfg(any(test, feature = "test-support"))]
575            Self::Deterministic { executor } => executor.spawn(future),
576        };
577        Task::send(any_task)
578    }
579
580    pub fn block<F, T>(&self, future: F) -> T
581    where
582        F: Future<Output = T>,
583    {
584        smol::pin!(future);
585        match self {
586            Self::Production { .. } => smol::block_on(&mut future),
587            #[cfg(any(test, feature = "test-support"))]
588            Self::Deterministic { executor, .. } => {
589                executor.block(&mut future, usize::MAX).unwrap()
590            }
591        }
592    }
593
594    pub fn block_with_timeout<F, T>(
595        &self,
596        timeout: Duration,
597        future: F,
598    ) -> Result<T, impl Future<Output = T>>
599    where
600        T: 'static,
601        F: 'static + Unpin + Future<Output = T>,
602    {
603        let mut future = any_local_future(future);
604        if !timeout.is_zero() {
605            let output = match self {
606                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
607                #[cfg(any(test, feature = "test-support"))]
608                Self::Deterministic { executor, .. } => {
609                    use rand::prelude::*;
610                    let max_ticks = {
611                        let mut state = executor.state.lock();
612                        let range = state.block_on_ticks.clone();
613                        state.rng.gen_range(range)
614                    };
615                    executor.block(&mut future, max_ticks)
616                }
617            };
618            if let Some(output) = output {
619                return Ok(*output.downcast().unwrap());
620            }
621        }
622        Err(async { *future.await.downcast().unwrap() })
623    }
624
625    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
626    where
627        F: FnOnce(&mut Scope<'scope>),
628    {
629        let mut scope = Scope::new(self.clone());
630        (scheduler)(&mut scope);
631        let spawned = mem::take(&mut scope.futures)
632            .into_iter()
633            .map(|f| self.spawn(f))
634            .collect::<Vec<_>>();
635        for task in spawned {
636            task.await;
637        }
638    }
639
640    pub fn timer(&self, duration: Duration) -> Timer {
641        match self {
642            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
643            #[cfg(any(test, feature = "test-support"))]
644            Background::Deterministic { executor } => executor.timer(duration),
645        }
646    }
647
648    #[cfg(any(test, feature = "test-support"))]
649    pub async fn simulate_random_delay(&self) {
650        use rand::prelude::*;
651        use smol::future::yield_now;
652
653        match self {
654            Self::Deterministic { executor, .. } => {
655                if executor.state.lock().rng.gen_bool(0.2) {
656                    let yields = executor.state.lock().rng.gen_range(1..=10);
657                    for _ in 0..yields {
658                        yield_now().await;
659                    }
660                }
661            }
662            _ => {
663                log::info!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
664
665                // panic!("this method can only be called on a deterministic executor")
666            }
667        }
668    }
669}
670
671pub struct Scope<'a> {
672    executor: Arc<Background>,
673    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
674    tx: Option<mpsc::Sender<()>>,
675    rx: mpsc::Receiver<()>,
676    _phantom: PhantomData<&'a ()>,
677}
678
679impl<'a> Scope<'a> {
680    fn new(executor: Arc<Background>) -> Self {
681        let (tx, rx) = mpsc::channel(1);
682        Self {
683            executor,
684            tx: Some(tx),
685            rx,
686            futures: Default::default(),
687            _phantom: PhantomData,
688        }
689    }
690
691    pub fn spawn<F>(&mut self, f: F)
692    where
693        F: Future<Output = ()> + Send + 'a,
694    {
695        let tx = self.tx.clone().unwrap();
696
697        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
698        // dropping this `Scope` blocks until all of the futures have resolved.
699        let f = unsafe {
700            mem::transmute::<
701                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
702                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
703            >(Box::pin(async move {
704                f.await;
705                drop(tx);
706            }))
707        };
708        self.futures.push(f);
709    }
710}
711
712impl<'a> Drop for Scope<'a> {
713    fn drop(&mut self) {
714        self.tx.take().unwrap();
715
716        // Wait until the channel is closed, which means that all of the spawned
717        // futures have resolved.
718        self.executor.block(self.rx.next());
719    }
720}
721
722impl<T> Task<T> {
723    pub fn ready(value: T) -> Self {
724        Self::Ready(Some(value))
725    }
726
727    fn local(any_task: AnyLocalTask) -> Self {
728        Self::Local {
729            any_task,
730            result_type: PhantomData,
731        }
732    }
733
734    pub fn detach(self) {
735        match self {
736            Task::Ready(_) => {}
737            Task::Local { any_task, .. } => any_task.detach(),
738            Task::Send { any_task, .. } => any_task.detach(),
739        }
740    }
741}
742
743impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
744    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
745        cx.spawn(|_| async move {
746            if let Err(err) = self.await {
747                log::error!("{}", err);
748            }
749        })
750        .detach();
751    }
752}
753
754impl<T: Send> Task<T> {
755    fn send(any_task: AnyTask) -> Self {
756        Self::Send {
757            any_task,
758            result_type: PhantomData,
759        }
760    }
761}
762
763impl<T: fmt::Debug> fmt::Debug for Task<T> {
764    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
765        match self {
766            Task::Ready(value) => value.fmt(f),
767            Task::Local { any_task, .. } => any_task.fmt(f),
768            Task::Send { any_task, .. } => any_task.fmt(f),
769        }
770    }
771}
772
773impl<T: 'static> Future for Task<T> {
774    type Output = T;
775
776    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777        match unsafe { self.get_unchecked_mut() } {
778            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
779            Task::Local { any_task, .. } => {
780                any_task.poll(cx).map(|value| *value.downcast().unwrap())
781            }
782            Task::Send { any_task, .. } => {
783                any_task.poll(cx).map(|value| *value.downcast().unwrap())
784            }
785        }
786    }
787}
788
789fn any_future<T, F>(future: F) -> AnyFuture
790where
791    T: 'static + Send,
792    F: Future<Output = T> + Send + 'static,
793{
794    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
795}
796
797fn any_local_future<T, F>(future: F) -> AnyLocalFuture
798where
799    T: 'static,
800    F: Future<Output = T> + 'static,
801{
802    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
803}