executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use smol::{channel, prelude::*, Executor};
  4use std::{
  5    any::Any,
  6    fmt::{self, Display},
  7    marker::PhantomData,
  8    mem,
  9    pin::Pin,
 10    rc::Rc,
 11    sync::{mpsc, Arc},
 12    task::{Context, Poll},
 13    thread,
 14    time::Duration,
 15};
 16
 17use crate::{
 18    platform::{self, Dispatcher},
 19    util, MutableAppContext,
 20};
 21
 22pub enum Foreground {
 23    Platform {
 24        dispatcher: Arc<dyn platform::Dispatcher>,
 25        _not_send_or_sync: PhantomData<Rc<()>>,
 26    },
 27    #[cfg(any(test, feature = "test-support"))]
 28    Deterministic {
 29        cx_id: usize,
 30        executor: Arc<Deterministic>,
 31    },
 32}
 33
 34pub enum Background {
 35    #[cfg(any(test, feature = "test-support"))]
 36    Deterministic { executor: Arc<Deterministic> },
 37    Production {
 38        executor: Arc<smol::Executor<'static>>,
 39        _stop: channel::Sender<()>,
 40    },
 41}
 42
 43type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 44type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 45type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 46type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 47
 48#[must_use]
 49pub enum Task<T> {
 50    Ready(Option<T>),
 51    Local {
 52        any_task: AnyLocalTask,
 53        result_type: PhantomData<T>,
 54    },
 55    Send {
 56        any_task: AnyTask,
 57        result_type: PhantomData<T>,
 58    },
 59}
 60
 61unsafe impl<T: Send> Send for Task<T> {}
 62
 63#[cfg(any(test, feature = "test-support"))]
 64struct DeterministicState {
 65    rng: rand::prelude::StdRng,
 66    seed: u64,
 67    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 68    scheduled_from_background: Vec<Runnable>,
 69    forbid_parking: bool,
 70    block_on_ticks: std::ops::RangeInclusive<usize>,
 71    now: std::time::Instant,
 72    next_timer_id: usize,
 73    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 74    waiting_backtrace: Option<backtrace::Backtrace>,
 75}
 76
 77#[cfg(any(test, feature = "test-support"))]
 78struct ForegroundRunnable {
 79    runnable: Runnable,
 80    main: bool,
 81}
 82
 83#[cfg(any(test, feature = "test-support"))]
 84pub struct Deterministic {
 85    state: Arc<parking_lot::Mutex<DeterministicState>>,
 86    parker: parking_lot::Mutex<parking::Parker>,
 87}
 88
 89pub enum Timer {
 90    Production(smol::Timer),
 91    #[cfg(any(test, feature = "test-support"))]
 92    Deterministic(DeterministicTimer),
 93}
 94
 95#[cfg(any(test, feature = "test-support"))]
 96pub struct DeterministicTimer {
 97    rx: postage::barrier::Receiver,
 98    id: usize,
 99    state: Arc<parking_lot::Mutex<DeterministicState>>,
100}
101
102#[cfg(any(test, feature = "test-support"))]
103impl Deterministic {
104    pub fn new(seed: u64) -> Arc<Self> {
105        use rand::prelude::*;
106
107        Arc::new(Self {
108            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
109                rng: StdRng::seed_from_u64(seed),
110                seed,
111                scheduled_from_foreground: Default::default(),
112                scheduled_from_background: Default::default(),
113                forbid_parking: false,
114                block_on_ticks: 0..=1000,
115                now: std::time::Instant::now(),
116                next_timer_id: Default::default(),
117                pending_timers: Default::default(),
118                waiting_backtrace: None,
119            })),
120            parker: Default::default(),
121        })
122    }
123
124    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
125        Arc::new(Background::Deterministic {
126            executor: self.clone(),
127        })
128    }
129
130    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
131        Rc::new(Foreground::Deterministic {
132            cx_id: id,
133            executor: self.clone(),
134        })
135    }
136
137    fn spawn_from_foreground(
138        &self,
139        cx_id: usize,
140        future: AnyLocalFuture,
141        main: bool,
142    ) -> AnyLocalTask {
143        let state = self.state.clone();
144        let unparker = self.parker.lock().unparker();
145        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
146            let mut state = state.lock();
147            state
148                .scheduled_from_foreground
149                .entry(cx_id)
150                .or_default()
151                .push(ForegroundRunnable { runnable, main });
152            unparker.unpark();
153        });
154        runnable.schedule();
155        task
156    }
157
158    fn spawn(&self, future: AnyFuture) -> AnyTask {
159        let state = self.state.clone();
160        let unparker = self.parker.lock().unparker();
161        let (runnable, task) = async_task::spawn(future, move |runnable| {
162            let mut state = state.lock();
163            state.scheduled_from_background.push(runnable);
164            unparker.unpark();
165        });
166        runnable.schedule();
167        task
168    }
169
170    fn run<'a>(
171        &self,
172        cx_id: usize,
173        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
174    ) -> Box<dyn Any> {
175        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
176
177        let woken = Arc::new(AtomicBool::new(false));
178
179        let state = self.state.clone();
180        let unparker = self.parker.lock().unparker();
181        let (runnable, mut main_task) = unsafe {
182            async_task::spawn_unchecked(main_future, move |runnable| {
183                let mut state = state.lock();
184                state
185                    .scheduled_from_foreground
186                    .entry(cx_id)
187                    .or_default()
188                    .push(ForegroundRunnable {
189                        runnable,
190                        main: true,
191                    });
192                unparker.unpark();
193            })
194        };
195        runnable.schedule();
196
197        loop {
198            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
199                return result;
200            }
201
202            if !woken.load(SeqCst) {
203                self.state.lock().will_park();
204            }
205
206            woken.store(false, SeqCst);
207            self.parker.lock().park();
208        }
209    }
210
211    pub fn run_until_parked(&self) {
212        use std::sync::atomic::AtomicBool;
213        let woken = Arc::new(AtomicBool::new(false));
214        self.run_internal(woken, None);
215    }
216
217    fn run_internal(
218        &self,
219        woken: Arc<std::sync::atomic::AtomicBool>,
220        mut main_task: Option<&mut AnyLocalTask>,
221    ) -> Option<Box<dyn Any>> {
222        use rand::prelude::*;
223        use std::sync::atomic::Ordering::SeqCst;
224
225        let unparker = self.parker.lock().unparker();
226        let waker = waker_fn::waker_fn(move || {
227            woken.store(true, SeqCst);
228            unparker.unpark();
229        });
230
231        let mut cx = Context::from_waker(&waker);
232        loop {
233            let mut state = self.state.lock();
234
235            if state.scheduled_from_foreground.is_empty()
236                && state.scheduled_from_background.is_empty()
237            {
238                if let Some(main_task) = main_task {
239                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
240                        return Some(result);
241                    }
242                }
243
244                return None;
245            }
246
247            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
248                let background_len = state.scheduled_from_background.len();
249                let ix = state.rng.gen_range(0..background_len);
250                let runnable = state.scheduled_from_background.remove(ix);
251                drop(state);
252                runnable.run();
253            } else if !state.scheduled_from_foreground.is_empty() {
254                let available_cx_ids = state
255                    .scheduled_from_foreground
256                    .keys()
257                    .copied()
258                    .collect::<Vec<_>>();
259                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
260                let scheduled_from_cx = state
261                    .scheduled_from_foreground
262                    .get_mut(&cx_id_to_run)
263                    .unwrap();
264                let foreground_runnable = scheduled_from_cx.remove(0);
265                if scheduled_from_cx.is_empty() {
266                    state.scheduled_from_foreground.remove(&cx_id_to_run);
267                }
268
269                drop(state);
270
271                foreground_runnable.runnable.run();
272                if let Some(main_task) = main_task.as_mut() {
273                    if foreground_runnable.main {
274                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
275                            return Some(result);
276                        }
277                    }
278                }
279            }
280        }
281    }
282
283    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
284    where
285        F: Unpin + Future<Output = T>,
286    {
287        use rand::prelude::*;
288
289        let unparker = self.parker.lock().unparker();
290        let waker = waker_fn::waker_fn(move || {
291            unparker.unpark();
292        });
293
294        let mut cx = Context::from_waker(&waker);
295        for _ in 0..max_ticks {
296            let mut state = self.state.lock();
297            let runnable_count = state.scheduled_from_background.len();
298            let ix = state.rng.gen_range(0..=runnable_count);
299            if ix < state.scheduled_from_background.len() {
300                let runnable = state.scheduled_from_background.remove(ix);
301                drop(state);
302                runnable.run();
303            } else {
304                drop(state);
305                if let Poll::Ready(result) = future.poll(&mut cx) {
306                    return Some(result);
307                }
308                let mut state = self.state.lock();
309                if state.scheduled_from_background.is_empty() {
310                    state.will_park();
311                    drop(state);
312                    self.parker.lock().park();
313                }
314
315                continue;
316            }
317        }
318
319        None
320    }
321
322    pub fn timer(&self, duration: Duration) -> Timer {
323        let (tx, rx) = postage::barrier::channel();
324        let mut state = self.state.lock();
325        let wakeup_at = state.now + duration;
326        let id = util::post_inc(&mut state.next_timer_id);
327        state.pending_timers.push((id, wakeup_at, tx));
328        let state = self.state.clone();
329        Timer::Deterministic(DeterministicTimer { rx, id, state })
330    }
331
332    pub fn advance_clock(&self, duration: Duration) {
333        let new_now = self.state.lock().now + duration;
334        loop {
335            self.run_until_parked();
336            let mut state = self.state.lock();
337
338            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
339                let wakeup_time = *wakeup_time;
340                if wakeup_time <= new_now {
341                    let timer_count = state
342                        .pending_timers
343                        .iter()
344                        .take_while(|(_, t, _)| *t == wakeup_time)
345                        .count();
346                    state.now = wakeup_time;
347                    let timers_to_wake = state
348                        .pending_timers
349                        .drain(0..timer_count)
350                        .collect::<Vec<_>>();
351                    drop(state);
352                    drop(timers_to_wake);
353                    continue;
354                }
355            }
356
357            break;
358        }
359
360        self.state.lock().now = new_now;
361    }
362}
363
364impl Drop for Timer {
365    fn drop(&mut self) {
366        #[cfg(any(test, feature = "test-support"))]
367        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
368            state
369                .lock()
370                .pending_timers
371                .retain(|(timer_id, _, _)| timer_id != id)
372        }
373    }
374}
375
376impl Future for Timer {
377    type Output = ();
378
379    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
380        match &mut *self {
381            #[cfg(any(test, feature = "test-support"))]
382            Self::Deterministic(DeterministicTimer { rx, .. }) => {
383                use postage::stream::{PollRecv, Stream as _};
384                smol::pin!(rx);
385                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
386                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
387                    PollRecv::Pending => Poll::Pending,
388                }
389            }
390            Self::Production(timer) => {
391                smol::pin!(timer);
392                match timer.poll(cx) {
393                    Poll::Ready(_) => Poll::Ready(()),
394                    Poll::Pending => Poll::Pending,
395                }
396            }
397        }
398    }
399}
400
401#[cfg(any(test, feature = "test-support"))]
402impl DeterministicState {
403    fn will_park(&mut self) {
404        if self.forbid_parking {
405            let mut backtrace_message = String::new();
406            #[cfg(any(test, feature = "test-support"))]
407            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
408                backtrace.resolve();
409                backtrace_message = format!(
410                    "\nbacktrace of waiting future:\n{:?}",
411                    util::CwdBacktrace(backtrace)
412                );
413            }
414
415            panic!(
416                "deterministic executor parked after a call to forbid_parking{}",
417                backtrace_message
418            );
419        }
420    }
421}
422
423impl Foreground {
424    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
425        if dispatcher.is_main_thread() {
426            Ok(Self::Platform {
427                dispatcher,
428                _not_send_or_sync: PhantomData,
429            })
430        } else {
431            Err(anyhow!("must be constructed on main thread"))
432        }
433    }
434
435    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
436        let future = any_local_future(future);
437        let any_task = match self {
438            #[cfg(any(test, feature = "test-support"))]
439            Self::Deterministic { cx_id, executor } => {
440                executor.spawn_from_foreground(*cx_id, future, false)
441            }
442            Self::Platform { dispatcher, .. } => {
443                fn spawn_inner(
444                    future: AnyLocalFuture,
445                    dispatcher: &Arc<dyn Dispatcher>,
446                ) -> AnyLocalTask {
447                    let dispatcher = dispatcher.clone();
448                    let schedule =
449                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
450                    let (runnable, task) = async_task::spawn_local(future, schedule);
451                    runnable.schedule();
452                    task
453                }
454                spawn_inner(future, dispatcher)
455            }
456        };
457        Task::local(any_task)
458    }
459
460    #[cfg(any(test, feature = "test-support"))]
461    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
462        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
463        let result = match self {
464            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
465            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
466        };
467        *result.downcast().unwrap()
468    }
469
470    #[cfg(any(test, feature = "test-support"))]
471    pub fn run_until_parked(&self) {
472        match self {
473            Self::Deterministic { executor, .. } => executor.run_until_parked(),
474            _ => panic!("this method can only be called on a deterministic executor"),
475        }
476    }
477
478    #[cfg(any(test, feature = "test-support"))]
479    pub fn parking_forbidden(&self) -> bool {
480        match self {
481            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
482            _ => panic!("this method can only be called on a deterministic executor"),
483        }
484    }
485
486    #[cfg(any(test, feature = "test-support"))]
487    pub fn start_waiting(&self) {
488        match self {
489            Self::Deterministic { executor, .. } => {
490                executor.state.lock().waiting_backtrace =
491                    Some(backtrace::Backtrace::new_unresolved());
492            }
493            _ => panic!("this method can only be called on a deterministic executor"),
494        }
495    }
496
497    #[cfg(any(test, feature = "test-support"))]
498    pub fn finish_waiting(&self) {
499        match self {
500            Self::Deterministic { executor, .. } => {
501                executor.state.lock().waiting_backtrace.take();
502            }
503            _ => panic!("this method can only be called on a deterministic executor"),
504        }
505    }
506
507    #[cfg(any(test, feature = "test-support"))]
508    pub fn forbid_parking(&self) {
509        use rand::prelude::*;
510
511        match self {
512            Self::Deterministic { executor, .. } => {
513                let mut state = executor.state.lock();
514                state.forbid_parking = true;
515                state.rng = StdRng::seed_from_u64(state.seed);
516            }
517            _ => panic!("this method can only be called on a deterministic executor"),
518        }
519    }
520
521    #[cfg(any(test, feature = "test-support"))]
522    pub fn advance_clock(&self, duration: Duration) {
523        match self {
524            Self::Deterministic { executor, .. } => {
525                executor.run_until_parked();
526                executor.advance_clock(duration);
527            }
528            _ => panic!("this method can only be called on a deterministic executor"),
529        }
530    }
531
532    #[cfg(any(test, feature = "test-support"))]
533    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
534        match self {
535            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
536            _ => panic!("this method can only be called on a deterministic executor"),
537        }
538    }
539}
540
541impl Background {
542    pub fn new() -> Self {
543        let executor = Arc::new(Executor::new());
544        let stop = channel::unbounded::<()>();
545
546        for i in 0..2 * num_cpus::get() {
547            let executor = executor.clone();
548            let stop = stop.1.clone();
549            thread::Builder::new()
550                .name(format!("background-executor-{}", i))
551                .spawn(move || smol::block_on(executor.run(stop.recv())))
552                .unwrap();
553        }
554
555        Self::Production {
556            executor,
557            _stop: stop.0,
558        }
559    }
560
561    pub fn num_cpus(&self) -> usize {
562        num_cpus::get()
563    }
564
565    pub fn spawn<T, F>(&self, future: F) -> Task<T>
566    where
567        T: 'static + Send,
568        F: Send + Future<Output = T> + 'static,
569    {
570        let future = any_future(future);
571        let any_task = match self {
572            Self::Production { executor, .. } => executor.spawn(future),
573            #[cfg(any(test, feature = "test-support"))]
574            Self::Deterministic { executor } => executor.spawn(future),
575        };
576        Task::send(any_task)
577    }
578
579    pub fn block<F, T>(&self, future: F) -> T
580    where
581        F: Future<Output = T>,
582    {
583        smol::pin!(future);
584        match self {
585            Self::Production { .. } => smol::block_on(&mut future),
586            #[cfg(any(test, feature = "test-support"))]
587            Self::Deterministic { executor, .. } => {
588                executor.block(&mut future, usize::MAX).unwrap()
589            }
590        }
591    }
592
593    pub fn block_with_timeout<F, T>(
594        &self,
595        timeout: Duration,
596        future: F,
597    ) -> Result<T, impl Future<Output = T>>
598    where
599        T: 'static,
600        F: 'static + Unpin + Future<Output = T>,
601    {
602        let mut future = any_local_future(future);
603        if !timeout.is_zero() {
604            let output = match self {
605                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
606                #[cfg(any(test, feature = "test-support"))]
607                Self::Deterministic { executor, .. } => {
608                    use rand::prelude::*;
609                    let max_ticks = {
610                        let mut state = executor.state.lock();
611                        let range = state.block_on_ticks.clone();
612                        state.rng.gen_range(range)
613                    };
614                    executor.block(&mut future, max_ticks)
615                }
616            };
617            if let Some(output) = output {
618                return Ok(*output.downcast().unwrap());
619            }
620        }
621        Err(async { *future.await.downcast().unwrap() })
622    }
623
624    pub async fn scoped<'scope, F>(&self, scheduler: F)
625    where
626        F: FnOnce(&mut Scope<'scope>),
627    {
628        let mut scope = Scope::new();
629        (scheduler)(&mut scope);
630        let spawned = mem::take(&mut scope.futures)
631            .into_iter()
632            .map(|f| self.spawn(f))
633            .collect::<Vec<_>>();
634        for task in spawned {
635            task.await;
636        }
637    }
638
639    pub fn timer(&self, duration: Duration) -> Timer {
640        match self {
641            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
642            #[cfg(any(test, feature = "test-support"))]
643            Background::Deterministic { executor } => executor.timer(duration),
644        }
645    }
646
647    #[cfg(any(test, feature = "test-support"))]
648    pub async fn simulate_random_delay(&self) {
649        use rand::prelude::*;
650        use smol::future::yield_now;
651
652        match self {
653            Self::Deterministic { executor, .. } => {
654                if executor.state.lock().rng.gen_bool(0.2) {
655                    let yields = executor.state.lock().rng.gen_range(1..=10);
656                    for _ in 0..yields {
657                        yield_now().await;
658                    }
659                }
660            }
661            _ => panic!("this method can only be called on a deterministic executor"),
662        }
663    }
664}
665
666pub struct Scope<'a> {
667    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
668    tx: Option<mpsc::Sender<()>>,
669    rx: mpsc::Receiver<()>,
670    _phantom: PhantomData<&'a ()>,
671}
672
673impl<'a> Scope<'a> {
674    fn new() -> Self {
675        let (tx, rx) = mpsc::channel();
676        Self {
677            tx: Some(tx),
678            rx,
679            futures: Default::default(),
680            _phantom: PhantomData,
681        }
682    }
683
684    pub fn spawn<F>(&mut self, f: F)
685    where
686        F: Future<Output = ()> + Send + 'a,
687    {
688        let tx = self.tx.clone().unwrap();
689
690        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
691        // dropping this `Scope` blocks until all of the futures have resolved.
692        let f = unsafe {
693            mem::transmute::<
694                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
695                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
696            >(Box::pin(async move {
697                f.await;
698                drop(tx);
699            }))
700        };
701        self.futures.push(f);
702    }
703}
704
705impl<'a> Drop for Scope<'a> {
706    fn drop(&mut self) {
707        self.tx.take().unwrap();
708
709        // Wait until the channel is closed, which means that all of the spawned
710        // futures have resolved.
711        self.rx.recv().ok();
712    }
713}
714
715impl<T> Task<T> {
716    pub fn ready(value: T) -> Self {
717        Self::Ready(Some(value))
718    }
719
720    fn local(any_task: AnyLocalTask) -> Self {
721        Self::Local {
722            any_task,
723            result_type: PhantomData,
724        }
725    }
726
727    pub fn detach(self) {
728        match self {
729            Task::Ready(_) => {}
730            Task::Local { any_task, .. } => any_task.detach(),
731            Task::Send { any_task, .. } => any_task.detach(),
732        }
733    }
734}
735
736impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
737    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
738        cx.spawn(|_| async move {
739            if let Err(err) = self.await {
740                log::error!("{}", err);
741            }
742        })
743        .detach();
744    }
745}
746
747impl<T: Send> Task<T> {
748    fn send(any_task: AnyTask) -> Self {
749        Self::Send {
750            any_task,
751            result_type: PhantomData,
752        }
753    }
754}
755
756impl<T: fmt::Debug> fmt::Debug for Task<T> {
757    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758        match self {
759            Task::Ready(value) => value.fmt(f),
760            Task::Local { any_task, .. } => any_task.fmt(f),
761            Task::Send { any_task, .. } => any_task.fmt(f),
762        }
763    }
764}
765
766impl<T: 'static> Future for Task<T> {
767    type Output = T;
768
769    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
770        match unsafe { self.get_unchecked_mut() } {
771            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
772            Task::Local { any_task, .. } => {
773                any_task.poll(cx).map(|value| *value.downcast().unwrap())
774            }
775            Task::Send { any_task, .. } => {
776                any_task.poll(cx).map(|value| *value.downcast().unwrap())
777            }
778        }
779    }
780}
781
782fn any_future<T, F>(future: F) -> AnyFuture
783where
784    T: 'static + Send,
785    F: Future<Output = T> + Send + 'static,
786{
787    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
788}
789
790fn any_local_future<T, F>(future: F) -> AnyLocalFuture
791where
792    T: 'static,
793    F: Future<Output = T> + 'static,
794{
795    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
796}