executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<Runnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76}
 77
 78#[cfg(any(test, feature = "test-support"))]
 79struct ForegroundRunnable {
 80    runnable: Runnable,
 81    main: bool,
 82}
 83
 84#[cfg(any(test, feature = "test-support"))]
 85pub struct Deterministic {
 86    state: Arc<parking_lot::Mutex<DeterministicState>>,
 87    parker: parking_lot::Mutex<parking::Parker>,
 88}
 89
 90pub enum Timer {
 91    Production(smol::Timer),
 92    #[cfg(any(test, feature = "test-support"))]
 93    Deterministic(DeterministicTimer),
 94}
 95
 96#[cfg(any(test, feature = "test-support"))]
 97pub struct DeterministicTimer {
 98    rx: postage::barrier::Receiver,
 99    id: usize,
100    state: Arc<parking_lot::Mutex<DeterministicState>>,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104impl Deterministic {
105    pub fn new(seed: u64) -> Arc<Self> {
106        use rand::prelude::*;
107
108        Arc::new(Self {
109            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
110                rng: StdRng::seed_from_u64(seed),
111                seed,
112                scheduled_from_foreground: Default::default(),
113                scheduled_from_background: Default::default(),
114                forbid_parking: false,
115                block_on_ticks: 0..=1000,
116                now: std::time::Instant::now(),
117                next_timer_id: Default::default(),
118                pending_timers: Default::default(),
119                waiting_backtrace: None,
120            })),
121            parker: Default::default(),
122        })
123    }
124
125    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
126        Arc::new(Background::Deterministic {
127            executor: self.clone(),
128        })
129    }
130
131    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
132        Rc::new(Foreground::Deterministic {
133            cx_id: id,
134            executor: self.clone(),
135        })
136    }
137
138    fn spawn_from_foreground(
139        &self,
140        cx_id: usize,
141        future: AnyLocalFuture,
142        main: bool,
143    ) -> AnyLocalTask {
144        let state = self.state.clone();
145        let unparker = self.parker.lock().unparker();
146        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
147            let mut state = state.lock();
148            state
149                .scheduled_from_foreground
150                .entry(cx_id)
151                .or_default()
152                .push(ForegroundRunnable { runnable, main });
153            unparker.unpark();
154        });
155        runnable.schedule();
156        task
157    }
158
159    fn spawn(&self, future: AnyFuture) -> AnyTask {
160        let state = self.state.clone();
161        let unparker = self.parker.lock().unparker();
162        let (runnable, task) = async_task::spawn(future, move |runnable| {
163            let mut state = state.lock();
164            state.scheduled_from_background.push(runnable);
165            unparker.unpark();
166        });
167        runnable.schedule();
168        task
169    }
170
171    fn run<'a>(
172        &self,
173        cx_id: usize,
174        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
175    ) -> Box<dyn Any> {
176        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
177
178        let woken = Arc::new(AtomicBool::new(false));
179
180        let state = self.state.clone();
181        let unparker = self.parker.lock().unparker();
182        let (runnable, mut main_task) = unsafe {
183            async_task::spawn_unchecked(main_future, move |runnable| {
184                let mut state = state.lock();
185                state
186                    .scheduled_from_foreground
187                    .entry(cx_id)
188                    .or_default()
189                    .push(ForegroundRunnable {
190                        runnable,
191                        main: true,
192                    });
193                unparker.unpark();
194            })
195        };
196        runnable.schedule();
197
198        loop {
199            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
200                return result;
201            }
202
203            if !woken.load(SeqCst) {
204                self.state.lock().will_park();
205            }
206
207            woken.store(false, SeqCst);
208            self.parker.lock().park();
209        }
210    }
211
212    pub fn run_until_parked(&self) {
213        use std::sync::atomic::AtomicBool;
214        let woken = Arc::new(AtomicBool::new(false));
215        self.run_internal(woken, None);
216    }
217
218    fn run_internal(
219        &self,
220        woken: Arc<std::sync::atomic::AtomicBool>,
221        mut main_task: Option<&mut AnyLocalTask>,
222    ) -> Option<Box<dyn Any>> {
223        use rand::prelude::*;
224        use std::sync::atomic::Ordering::SeqCst;
225
226        let unparker = self.parker.lock().unparker();
227        let waker = waker_fn::waker_fn(move || {
228            woken.store(true, SeqCst);
229            unparker.unpark();
230        });
231
232        let mut cx = Context::from_waker(&waker);
233        loop {
234            let mut state = self.state.lock();
235
236            if state.scheduled_from_foreground.is_empty()
237                && state.scheduled_from_background.is_empty()
238            {
239                if let Some(main_task) = main_task {
240                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
241                        return Some(result);
242                    }
243                }
244
245                return None;
246            }
247
248            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
249                let background_len = state.scheduled_from_background.len();
250                let ix = state.rng.gen_range(0..background_len);
251                let runnable = state.scheduled_from_background.remove(ix);
252                drop(state);
253                runnable.run();
254            } else if !state.scheduled_from_foreground.is_empty() {
255                let available_cx_ids = state
256                    .scheduled_from_foreground
257                    .keys()
258                    .copied()
259                    .collect::<Vec<_>>();
260                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
261                let scheduled_from_cx = state
262                    .scheduled_from_foreground
263                    .get_mut(&cx_id_to_run)
264                    .unwrap();
265                let foreground_runnable = scheduled_from_cx.remove(0);
266                if scheduled_from_cx.is_empty() {
267                    state.scheduled_from_foreground.remove(&cx_id_to_run);
268                }
269
270                drop(state);
271
272                foreground_runnable.runnable.run();
273                if let Some(main_task) = main_task.as_mut() {
274                    if foreground_runnable.main {
275                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
276                            return Some(result);
277                        }
278                    }
279                }
280            }
281        }
282    }
283
284    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
285    where
286        F: Unpin + Future<Output = T>,
287    {
288        use rand::prelude::*;
289
290        let unparker = self.parker.lock().unparker();
291        let waker = waker_fn::waker_fn(move || {
292            unparker.unpark();
293        });
294
295        let mut cx = Context::from_waker(&waker);
296        for _ in 0..max_ticks {
297            let mut state = self.state.lock();
298            let runnable_count = state.scheduled_from_background.len();
299            let ix = state.rng.gen_range(0..=runnable_count);
300            if ix < state.scheduled_from_background.len() {
301                let runnable = state.scheduled_from_background.remove(ix);
302                drop(state);
303                runnable.run();
304            } else {
305                drop(state);
306                if let Poll::Ready(result) = future.poll(&mut cx) {
307                    return Some(result);
308                }
309                let mut state = self.state.lock();
310                if state.scheduled_from_background.is_empty() {
311                    state.will_park();
312                    drop(state);
313                    self.parker.lock().park();
314                }
315
316                continue;
317            }
318        }
319
320        None
321    }
322
323    pub fn timer(&self, duration: Duration) -> Timer {
324        let (tx, rx) = postage::barrier::channel();
325        let mut state = self.state.lock();
326        let wakeup_at = state.now + duration;
327        let id = util::post_inc(&mut state.next_timer_id);
328        state.pending_timers.push((id, wakeup_at, tx));
329        let state = self.state.clone();
330        Timer::Deterministic(DeterministicTimer { rx, id, state })
331    }
332
333    pub fn now(&self) -> std::time::Instant {
334        let state = self.state.lock();
335        state.now.clone()
336    }
337
338    pub fn advance_clock(&self, duration: Duration) {
339        let new_now = self.state.lock().now + duration;
340        loop {
341            self.run_until_parked();
342            let mut state = self.state.lock();
343
344            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
345                let wakeup_time = *wakeup_time;
346                if wakeup_time <= new_now {
347                    let timer_count = state
348                        .pending_timers
349                        .iter()
350                        .take_while(|(_, t, _)| *t == wakeup_time)
351                        .count();
352                    state.now = wakeup_time;
353                    let timers_to_wake = state
354                        .pending_timers
355                        .drain(0..timer_count)
356                        .collect::<Vec<_>>();
357                    drop(state);
358                    drop(timers_to_wake);
359                    continue;
360                }
361            }
362
363            break;
364        }
365
366        self.state.lock().now = new_now;
367    }
368
369    pub fn start_waiting(&self) {
370        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
371    }
372
373    pub fn finish_waiting(&self) {
374        self.state.lock().waiting_backtrace.take();
375    }
376
377    pub fn forbid_parking(&self) {
378        use rand::prelude::*;
379
380        let mut state = self.state.lock();
381        state.forbid_parking = true;
382        state.rng = StdRng::seed_from_u64(state.seed);
383    }
384}
385
386impl Drop for Timer {
387    fn drop(&mut self) {
388        #[cfg(any(test, feature = "test-support"))]
389        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
390            state
391                .lock()
392                .pending_timers
393                .retain(|(timer_id, _, _)| timer_id != id)
394        }
395    }
396}
397
398impl Future for Timer {
399    type Output = ();
400
401    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
402        match &mut *self {
403            #[cfg(any(test, feature = "test-support"))]
404            Self::Deterministic(DeterministicTimer { rx, .. }) => {
405                use postage::stream::{PollRecv, Stream as _};
406                smol::pin!(rx);
407                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
408                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
409                    PollRecv::Pending => Poll::Pending,
410                }
411            }
412            Self::Production(timer) => {
413                smol::pin!(timer);
414                match timer.poll(cx) {
415                    Poll::Ready(_) => Poll::Ready(()),
416                    Poll::Pending => Poll::Pending,
417                }
418            }
419        }
420    }
421}
422
423#[cfg(any(test, feature = "test-support"))]
424impl DeterministicState {
425    fn will_park(&mut self) {
426        if self.forbid_parking {
427            let mut backtrace_message = String::new();
428            #[cfg(any(test, feature = "test-support"))]
429            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
430                backtrace.resolve();
431                backtrace_message = format!(
432                    "\nbacktrace of waiting future:\n{:?}",
433                    util::CwdBacktrace(backtrace)
434                );
435            }
436
437            panic!(
438                "deterministic executor parked after a call to forbid_parking{}",
439                backtrace_message
440            );
441        }
442    }
443}
444
445impl Foreground {
446    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
447        if dispatcher.is_main_thread() {
448            Ok(Self::Platform {
449                dispatcher,
450                _not_send_or_sync: PhantomData,
451            })
452        } else {
453            Err(anyhow!("must be constructed on main thread"))
454        }
455    }
456
457    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
458        let future = any_local_future(future);
459        let any_task = match self {
460            #[cfg(any(test, feature = "test-support"))]
461            Self::Deterministic { cx_id, executor } => {
462                executor.spawn_from_foreground(*cx_id, future, false)
463            }
464            Self::Platform { dispatcher, .. } => {
465                fn spawn_inner(
466                    future: AnyLocalFuture,
467                    dispatcher: &Arc<dyn Dispatcher>,
468                ) -> AnyLocalTask {
469                    let dispatcher = dispatcher.clone();
470                    let schedule =
471                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
472                    let (runnable, task) = async_task::spawn_local(future, schedule);
473                    runnable.schedule();
474                    task
475                }
476                spawn_inner(future, dispatcher)
477            }
478        };
479        Task::local(any_task)
480    }
481
482    #[cfg(any(test, feature = "test-support"))]
483    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
484        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
485        let result = match self {
486            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
487            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
488        };
489        *result.downcast().unwrap()
490    }
491
492    #[cfg(any(test, feature = "test-support"))]
493    pub fn run_until_parked(&self) {
494        match self {
495            Self::Deterministic { executor, .. } => executor.run_until_parked(),
496            _ => panic!("this method can only be called on a deterministic executor"),
497        }
498    }
499
500    #[cfg(any(test, feature = "test-support"))]
501    pub fn parking_forbidden(&self) -> bool {
502        match self {
503            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
504            _ => panic!("this method can only be called on a deterministic executor"),
505        }
506    }
507
508    #[cfg(any(test, feature = "test-support"))]
509    pub fn start_waiting(&self) {
510        match self {
511            Self::Deterministic { executor, .. } => executor.start_waiting(),
512            _ => panic!("this method can only be called on a deterministic executor"),
513        }
514    }
515
516    #[cfg(any(test, feature = "test-support"))]
517    pub fn finish_waiting(&self) {
518        match self {
519            Self::Deterministic { executor, .. } => executor.finish_waiting(),
520            _ => panic!("this method can only be called on a deterministic executor"),
521        }
522    }
523
524    #[cfg(any(test, feature = "test-support"))]
525    pub fn forbid_parking(&self) {
526        match self {
527            Self::Deterministic { executor, .. } => executor.forbid_parking(),
528            _ => panic!("this method can only be called on a deterministic executor"),
529        }
530    }
531
532    #[cfg(any(test, feature = "test-support"))]
533    pub fn advance_clock(&self, duration: Duration) {
534        match self {
535            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
536            _ => panic!("this method can only be called on a deterministic executor"),
537        }
538    }
539
540    #[cfg(any(test, feature = "test-support"))]
541    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
542        match self {
543            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
544            _ => panic!("this method can only be called on a deterministic executor"),
545        }
546    }
547}
548
549impl Background {
550    pub fn new() -> Self {
551        let executor = Arc::new(Executor::new());
552        let stop = channel::unbounded::<()>();
553
554        for i in 0..2 * num_cpus::get() {
555            let executor = executor.clone();
556            let stop = stop.1.clone();
557            thread::Builder::new()
558                .name(format!("background-executor-{}", i))
559                .spawn(move || smol::block_on(executor.run(stop.recv())))
560                .unwrap();
561        }
562
563        Self::Production {
564            executor,
565            _stop: stop.0,
566        }
567    }
568
569    pub fn num_cpus(&self) -> usize {
570        num_cpus::get()
571    }
572
573    pub fn spawn<T, F>(&self, future: F) -> Task<T>
574    where
575        T: 'static + Send,
576        F: Send + Future<Output = T> + 'static,
577    {
578        let future = any_future(future);
579        let any_task = match self {
580            Self::Production { executor, .. } => executor.spawn(future),
581            #[cfg(any(test, feature = "test-support"))]
582            Self::Deterministic { executor } => executor.spawn(future),
583        };
584        Task::send(any_task)
585    }
586
587    pub fn block<F, T>(&self, future: F) -> T
588    where
589        F: Future<Output = T>,
590    {
591        smol::pin!(future);
592        match self {
593            Self::Production { .. } => smol::block_on(&mut future),
594            #[cfg(any(test, feature = "test-support"))]
595            Self::Deterministic { executor, .. } => {
596                executor.block(&mut future, usize::MAX).unwrap()
597            }
598        }
599    }
600
601    pub fn block_with_timeout<F, T>(
602        &self,
603        timeout: Duration,
604        future: F,
605    ) -> Result<T, impl Future<Output = T>>
606    where
607        T: 'static,
608        F: 'static + Unpin + Future<Output = T>,
609    {
610        let mut future = any_local_future(future);
611        if !timeout.is_zero() {
612            let output = match self {
613                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
614                #[cfg(any(test, feature = "test-support"))]
615                Self::Deterministic { executor, .. } => {
616                    use rand::prelude::*;
617                    let max_ticks = {
618                        let mut state = executor.state.lock();
619                        let range = state.block_on_ticks.clone();
620                        state.rng.gen_range(range)
621                    };
622                    executor.block(&mut future, max_ticks)
623                }
624            };
625            if let Some(output) = output {
626                return Ok(*output.downcast().unwrap());
627            }
628        }
629        Err(async { *future.await.downcast().unwrap() })
630    }
631
632    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
633    where
634        F: FnOnce(&mut Scope<'scope>),
635    {
636        let mut scope = Scope::new(self.clone());
637        (scheduler)(&mut scope);
638        let spawned = mem::take(&mut scope.futures)
639            .into_iter()
640            .map(|f| self.spawn(f))
641            .collect::<Vec<_>>();
642        for task in spawned {
643            task.await;
644        }
645    }
646
647    pub fn timer(&self, duration: Duration) -> Timer {
648        match self {
649            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
650            #[cfg(any(test, feature = "test-support"))]
651            Background::Deterministic { executor } => executor.timer(duration),
652        }
653    }
654
655    pub fn now(&self) -> std::time::Instant {
656        match self {
657            Background::Production { .. } => std::time::Instant::now(),
658            #[cfg(any(test, feature = "test-support"))]
659            Background::Deterministic { executor } => executor.now(),
660        }
661    }
662
663    #[cfg(any(test, feature = "test-support"))]
664    pub async fn simulate_random_delay(&self) {
665        use rand::prelude::*;
666        use smol::future::yield_now;
667
668        match self {
669            Self::Deterministic { executor, .. } => {
670                if executor.state.lock().rng.gen_bool(0.2) {
671                    let yields = executor.state.lock().rng.gen_range(1..=10);
672                    for _ in 0..yields {
673                        yield_now().await;
674                    }
675                }
676            }
677            _ => {
678                panic!("this method can only be called on a deterministic executor")
679            }
680        }
681    }
682}
683
684pub struct Scope<'a> {
685    executor: Arc<Background>,
686    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
687    tx: Option<mpsc::Sender<()>>,
688    rx: mpsc::Receiver<()>,
689    _phantom: PhantomData<&'a ()>,
690}
691
692impl<'a> Scope<'a> {
693    fn new(executor: Arc<Background>) -> Self {
694        let (tx, rx) = mpsc::channel(1);
695        Self {
696            executor,
697            tx: Some(tx),
698            rx,
699            futures: Default::default(),
700            _phantom: PhantomData,
701        }
702    }
703
704    pub fn spawn<F>(&mut self, f: F)
705    where
706        F: Future<Output = ()> + Send + 'a,
707    {
708        let tx = self.tx.clone().unwrap();
709
710        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
711        // dropping this `Scope` blocks until all of the futures have resolved.
712        let f = unsafe {
713            mem::transmute::<
714                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
715                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
716            >(Box::pin(async move {
717                f.await;
718                drop(tx);
719            }))
720        };
721        self.futures.push(f);
722    }
723}
724
725impl<'a> Drop for Scope<'a> {
726    fn drop(&mut self) {
727        self.tx.take().unwrap();
728
729        // Wait until the channel is closed, which means that all of the spawned
730        // futures have resolved.
731        self.executor.block(self.rx.next());
732    }
733}
734
735impl<T> Task<T> {
736    pub fn ready(value: T) -> Self {
737        Self::Ready(Some(value))
738    }
739
740    fn local(any_task: AnyLocalTask) -> Self {
741        Self::Local {
742            any_task,
743            result_type: PhantomData,
744        }
745    }
746
747    pub fn detach(self) {
748        match self {
749            Task::Ready(_) => {}
750            Task::Local { any_task, .. } => any_task.detach(),
751            Task::Send { any_task, .. } => any_task.detach(),
752        }
753    }
754}
755
756impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
757    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
758        cx.spawn(|_| async move {
759            if let Err(err) = self.await {
760                log::error!("{}", err);
761            }
762        })
763        .detach();
764    }
765}
766
767impl<T: Send> Task<T> {
768    fn send(any_task: AnyTask) -> Self {
769        Self::Send {
770            any_task,
771            result_type: PhantomData,
772        }
773    }
774}
775
776impl<T: fmt::Debug> fmt::Debug for Task<T> {
777    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
778        match self {
779            Task::Ready(value) => value.fmt(f),
780            Task::Local { any_task, .. } => any_task.fmt(f),
781            Task::Send { any_task, .. } => any_task.fmt(f),
782        }
783    }
784}
785
786impl<T: 'static> Future for Task<T> {
787    type Output = T;
788
789    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
790        match unsafe { self.get_unchecked_mut() } {
791            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
792            Task::Local { any_task, .. } => {
793                any_task.poll(cx).map(|value| *value.downcast().unwrap())
794            }
795            Task::Send { any_task, .. } => {
796                any_task.poll(cx).map(|value| *value.downcast().unwrap())
797            }
798        }
799    }
800}
801
802fn any_future<T, F>(future: F) -> AnyFuture
803where
804    T: 'static + Send,
805    F: Future<Output = T> + Send + 'static,
806{
807    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
808}
809
810fn any_local_future<T, F>(future: F) -> AnyLocalFuture
811where
812    T: 'static,
813    F: Future<Output = T> + 'static,
814{
815    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
816}