executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<Runnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76}
 77
 78#[cfg(any(test, feature = "test-support"))]
 79struct ForegroundRunnable {
 80    runnable: Runnable,
 81    main: bool,
 82}
 83
 84#[cfg(any(test, feature = "test-support"))]
 85pub struct Deterministic {
 86    state: Arc<parking_lot::Mutex<DeterministicState>>,
 87    parker: parking_lot::Mutex<parking::Parker>,
 88}
 89
 90pub enum Timer {
 91    Production(smol::Timer),
 92    #[cfg(any(test, feature = "test-support"))]
 93    Deterministic(DeterministicTimer),
 94}
 95
 96#[cfg(any(test, feature = "test-support"))]
 97pub struct DeterministicTimer {
 98    rx: postage::barrier::Receiver,
 99    id: usize,
100    state: Arc<parking_lot::Mutex<DeterministicState>>,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104impl Deterministic {
105    pub fn new(seed: u64) -> Arc<Self> {
106        use rand::prelude::*;
107
108        Arc::new(Self {
109            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
110                rng: StdRng::seed_from_u64(seed),
111                seed,
112                scheduled_from_foreground: Default::default(),
113                scheduled_from_background: Default::default(),
114                forbid_parking: false,
115                block_on_ticks: 0..=1000,
116                now: std::time::Instant::now(),
117                next_timer_id: Default::default(),
118                pending_timers: Default::default(),
119                waiting_backtrace: None,
120            })),
121            parker: Default::default(),
122        })
123    }
124
125    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
126        Arc::new(Background::Deterministic {
127            executor: self.clone(),
128        })
129    }
130
131    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
132        Rc::new(Foreground::Deterministic {
133            cx_id: id,
134            executor: self.clone(),
135        })
136    }
137
138    fn spawn_from_foreground(
139        &self,
140        cx_id: usize,
141        future: AnyLocalFuture,
142        main: bool,
143    ) -> AnyLocalTask {
144        let state = self.state.clone();
145        let unparker = self.parker.lock().unparker();
146        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
147            let mut state = state.lock();
148            state
149                .scheduled_from_foreground
150                .entry(cx_id)
151                .or_default()
152                .push(ForegroundRunnable { runnable, main });
153            unparker.unpark();
154        });
155        runnable.schedule();
156        task
157    }
158
159    fn spawn(&self, future: AnyFuture) -> AnyTask {
160        let state = self.state.clone();
161        let unparker = self.parker.lock().unparker();
162        let (runnable, task) = async_task::spawn(future, move |runnable| {
163            let mut state = state.lock();
164            state.scheduled_from_background.push(runnable);
165            unparker.unpark();
166        });
167        runnable.schedule();
168        task
169    }
170
171    fn run<'a>(
172        &self,
173        cx_id: usize,
174        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
175    ) -> Box<dyn Any> {
176        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
177
178        let woken = Arc::new(AtomicBool::new(false));
179
180        let state = self.state.clone();
181        let unparker = self.parker.lock().unparker();
182        let (runnable, mut main_task) = unsafe {
183            async_task::spawn_unchecked(main_future, move |runnable| {
184                let mut state = state.lock();
185                state
186                    .scheduled_from_foreground
187                    .entry(cx_id)
188                    .or_default()
189                    .push(ForegroundRunnable {
190                        runnable,
191                        main: true,
192                    });
193                unparker.unpark();
194            })
195        };
196        runnable.schedule();
197
198        loop {
199            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
200                return result;
201            }
202
203            if !woken.load(SeqCst) {
204                self.state.lock().will_park();
205            }
206
207            woken.store(false, SeqCst);
208            self.parker.lock().park();
209        }
210    }
211
212    pub fn run_until_parked(&self) {
213        use std::sync::atomic::AtomicBool;
214        let woken = Arc::new(AtomicBool::new(false));
215        self.run_internal(woken, None);
216    }
217
218    fn run_internal(
219        &self,
220        woken: Arc<std::sync::atomic::AtomicBool>,
221        mut main_task: Option<&mut AnyLocalTask>,
222    ) -> Option<Box<dyn Any>> {
223        use rand::prelude::*;
224        use std::sync::atomic::Ordering::SeqCst;
225
226        let unparker = self.parker.lock().unparker();
227        let waker = waker_fn::waker_fn(move || {
228            woken.store(true, SeqCst);
229            unparker.unpark();
230        });
231
232        let mut cx = Context::from_waker(&waker);
233        loop {
234            let mut state = self.state.lock();
235
236            if state.scheduled_from_foreground.is_empty()
237                && state.scheduled_from_background.is_empty()
238            {
239                if let Some(main_task) = main_task {
240                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
241                        return Some(result);
242                    }
243                }
244
245                return None;
246            }
247
248            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
249                let background_len = state.scheduled_from_background.len();
250                let ix = state.rng.gen_range(0..background_len);
251                let runnable = state.scheduled_from_background.remove(ix);
252                drop(state);
253                runnable.run();
254            } else if !state.scheduled_from_foreground.is_empty() {
255                let available_cx_ids = state
256                    .scheduled_from_foreground
257                    .keys()
258                    .copied()
259                    .collect::<Vec<_>>();
260                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
261                let scheduled_from_cx = state
262                    .scheduled_from_foreground
263                    .get_mut(&cx_id_to_run)
264                    .unwrap();
265                let foreground_runnable = scheduled_from_cx.remove(0);
266                if scheduled_from_cx.is_empty() {
267                    state.scheduled_from_foreground.remove(&cx_id_to_run);
268                }
269
270                drop(state);
271
272                foreground_runnable.runnable.run();
273                if let Some(main_task) = main_task.as_mut() {
274                    if foreground_runnable.main {
275                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
276                            return Some(result);
277                        }
278                    }
279                }
280            }
281        }
282    }
283
284    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
285    where
286        F: Unpin + Future<Output = T>,
287    {
288        use rand::prelude::*;
289
290        let unparker = self.parker.lock().unparker();
291        let waker = waker_fn::waker_fn(move || {
292            unparker.unpark();
293        });
294
295        let mut cx = Context::from_waker(&waker);
296        for _ in 0..max_ticks {
297            let mut state = self.state.lock();
298            let runnable_count = state.scheduled_from_background.len();
299            let ix = state.rng.gen_range(0..=runnable_count);
300            if ix < state.scheduled_from_background.len() {
301                let runnable = state.scheduled_from_background.remove(ix);
302                drop(state);
303                runnable.run();
304            } else {
305                drop(state);
306                if let Poll::Ready(result) = future.poll(&mut cx) {
307                    return Some(result);
308                }
309                let mut state = self.state.lock();
310                if state.scheduled_from_background.is_empty() {
311                    state.will_park();
312                    drop(state);
313                    self.parker.lock().park();
314                }
315
316                continue;
317            }
318        }
319
320        None
321    }
322
323    pub fn timer(&self, duration: Duration) -> Timer {
324        let (tx, rx) = postage::barrier::channel();
325        let mut state = self.state.lock();
326        let wakeup_at = state.now + duration;
327        let id = util::post_inc(&mut state.next_timer_id);
328        state.pending_timers.push((id, wakeup_at, tx));
329        let state = self.state.clone();
330        Timer::Deterministic(DeterministicTimer { rx, id, state })
331    }
332
333    pub fn now(&self) -> std::time::Instant {
334        let state = self.state.lock();
335        state.now
336    }
337
338    pub fn advance_clock(&self, duration: Duration) {
339        let new_now = self.state.lock().now + duration;
340        loop {
341            self.run_until_parked();
342            let mut state = self.state.lock();
343
344            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
345                let wakeup_time = *wakeup_time;
346                if wakeup_time <= new_now {
347                    let timer_count = state
348                        .pending_timers
349                        .iter()
350                        .take_while(|(_, t, _)| *t == wakeup_time)
351                        .count();
352                    state.now = wakeup_time;
353                    let timers_to_wake = state
354                        .pending_timers
355                        .drain(0..timer_count)
356                        .collect::<Vec<_>>();
357                    drop(state);
358                    drop(timers_to_wake);
359                    continue;
360                }
361            }
362
363            break;
364        }
365
366        self.state.lock().now = new_now;
367    }
368
369    pub fn start_waiting(&self) {
370        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
371    }
372
373    pub fn finish_waiting(&self) {
374        self.state.lock().waiting_backtrace.take();
375    }
376
377    pub fn forbid_parking(&self) {
378        use rand::prelude::*;
379
380        let mut state = self.state.lock();
381        state.forbid_parking = true;
382        state.rng = StdRng::seed_from_u64(state.seed);
383    }
384
385    pub async fn simulate_random_delay(&self) {
386        use rand::prelude::*;
387        use smol::future::yield_now;
388        if self.state.lock().rng.gen_bool(0.2) {
389            let yields = self.state.lock().rng.gen_range(1..=10);
390            for _ in 0..yields {
391                yield_now().await;
392            }
393        }
394    }
395}
396
397impl Drop for Timer {
398    fn drop(&mut self) {
399        #[cfg(any(test, feature = "test-support"))]
400        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
401            state
402                .lock()
403                .pending_timers
404                .retain(|(timer_id, _, _)| timer_id != id)
405        }
406    }
407}
408
409impl Future for Timer {
410    type Output = ();
411
412    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
413        match &mut *self {
414            #[cfg(any(test, feature = "test-support"))]
415            Self::Deterministic(DeterministicTimer { rx, .. }) => {
416                use postage::stream::{PollRecv, Stream as _};
417                smol::pin!(rx);
418                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
419                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
420                    PollRecv::Pending => Poll::Pending,
421                }
422            }
423            Self::Production(timer) => {
424                smol::pin!(timer);
425                match timer.poll(cx) {
426                    Poll::Ready(_) => Poll::Ready(()),
427                    Poll::Pending => Poll::Pending,
428                }
429            }
430        }
431    }
432}
433
434#[cfg(any(test, feature = "test-support"))]
435impl DeterministicState {
436    fn will_park(&mut self) {
437        if self.forbid_parking {
438            let mut backtrace_message = String::new();
439            #[cfg(any(test, feature = "test-support"))]
440            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
441                backtrace.resolve();
442                backtrace_message = format!(
443                    "\nbacktrace of waiting future:\n{:?}",
444                    util::CwdBacktrace(backtrace)
445                );
446            }
447
448            panic!(
449                "deterministic executor parked after a call to forbid_parking{}",
450                backtrace_message
451            );
452        }
453    }
454}
455
456impl Foreground {
457    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
458        if dispatcher.is_main_thread() {
459            Ok(Self::Platform {
460                dispatcher,
461                _not_send_or_sync: PhantomData,
462            })
463        } else {
464            Err(anyhow!("must be constructed on main thread"))
465        }
466    }
467
468    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
469        let future = any_local_future(future);
470        let any_task = match self {
471            #[cfg(any(test, feature = "test-support"))]
472            Self::Deterministic { cx_id, executor } => {
473                executor.spawn_from_foreground(*cx_id, future, false)
474            }
475            Self::Platform { dispatcher, .. } => {
476                fn spawn_inner(
477                    future: AnyLocalFuture,
478                    dispatcher: &Arc<dyn Dispatcher>,
479                ) -> AnyLocalTask {
480                    let dispatcher = dispatcher.clone();
481                    let schedule =
482                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
483                    let (runnable, task) = async_task::spawn_local(future, schedule);
484                    runnable.schedule();
485                    task
486                }
487                spawn_inner(future, dispatcher)
488            }
489        };
490        Task::local(any_task)
491    }
492
493    #[cfg(any(test, feature = "test-support"))]
494    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
495        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
496        let result = match self {
497            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
498            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
499        };
500        *result.downcast().unwrap()
501    }
502
503    #[cfg(any(test, feature = "test-support"))]
504    pub fn run_until_parked(&self) {
505        match self {
506            Self::Deterministic { executor, .. } => executor.run_until_parked(),
507            _ => panic!("this method can only be called on a deterministic executor"),
508        }
509    }
510
511    #[cfg(any(test, feature = "test-support"))]
512    pub fn parking_forbidden(&self) -> bool {
513        match self {
514            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
515            _ => panic!("this method can only be called on a deterministic executor"),
516        }
517    }
518
519    #[cfg(any(test, feature = "test-support"))]
520    pub fn start_waiting(&self) {
521        match self {
522            Self::Deterministic { executor, .. } => executor.start_waiting(),
523            _ => panic!("this method can only be called on a deterministic executor"),
524        }
525    }
526
527    #[cfg(any(test, feature = "test-support"))]
528    pub fn finish_waiting(&self) {
529        match self {
530            Self::Deterministic { executor, .. } => executor.finish_waiting(),
531            _ => panic!("this method can only be called on a deterministic executor"),
532        }
533    }
534
535    #[cfg(any(test, feature = "test-support"))]
536    pub fn forbid_parking(&self) {
537        match self {
538            Self::Deterministic { executor, .. } => executor.forbid_parking(),
539            _ => panic!("this method can only be called on a deterministic executor"),
540        }
541    }
542
543    #[cfg(any(test, feature = "test-support"))]
544    pub fn advance_clock(&self, duration: Duration) {
545        match self {
546            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
547            _ => panic!("this method can only be called on a deterministic executor"),
548        }
549    }
550
551    #[cfg(any(test, feature = "test-support"))]
552    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
553        match self {
554            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
555            _ => panic!("this method can only be called on a deterministic executor"),
556        }
557    }
558}
559
560impl Background {
561    pub fn new() -> Self {
562        let executor = Arc::new(Executor::new());
563        let stop = channel::unbounded::<()>();
564
565        for i in 0..2 * num_cpus::get() {
566            let executor = executor.clone();
567            let stop = stop.1.clone();
568            thread::Builder::new()
569                .name(format!("background-executor-{}", i))
570                .spawn(move || smol::block_on(executor.run(stop.recv())))
571                .unwrap();
572        }
573
574        Self::Production {
575            executor,
576            _stop: stop.0,
577        }
578    }
579
580    pub fn num_cpus(&self) -> usize {
581        num_cpus::get()
582    }
583
584    pub fn spawn<T, F>(&self, future: F) -> Task<T>
585    where
586        T: 'static + Send,
587        F: Send + Future<Output = T> + 'static,
588    {
589        let future = any_future(future);
590        let any_task = match self {
591            Self::Production { executor, .. } => executor.spawn(future),
592            #[cfg(any(test, feature = "test-support"))]
593            Self::Deterministic { executor } => executor.spawn(future),
594        };
595        Task::send(any_task)
596    }
597
598    pub fn block<F, T>(&self, future: F) -> T
599    where
600        F: Future<Output = T>,
601    {
602        smol::pin!(future);
603        match self {
604            Self::Production { .. } => smol::block_on(&mut future),
605            #[cfg(any(test, feature = "test-support"))]
606            Self::Deterministic { executor, .. } => {
607                executor.block(&mut future, usize::MAX).unwrap()
608            }
609        }
610    }
611
612    pub fn block_with_timeout<F, T>(
613        &self,
614        timeout: Duration,
615        future: F,
616    ) -> Result<T, impl Future<Output = T>>
617    where
618        T: 'static,
619        F: 'static + Unpin + Future<Output = T>,
620    {
621        let mut future = any_local_future(future);
622        if !timeout.is_zero() {
623            let output = match self {
624                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
625                #[cfg(any(test, feature = "test-support"))]
626                Self::Deterministic { executor, .. } => {
627                    use rand::prelude::*;
628                    let max_ticks = {
629                        let mut state = executor.state.lock();
630                        let range = state.block_on_ticks.clone();
631                        state.rng.gen_range(range)
632                    };
633                    executor.block(&mut future, max_ticks)
634                }
635            };
636            if let Some(output) = output {
637                return Ok(*output.downcast().unwrap());
638            }
639        }
640        Err(async { *future.await.downcast().unwrap() })
641    }
642
643    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
644    where
645        F: FnOnce(&mut Scope<'scope>),
646    {
647        let mut scope = Scope::new(self.clone());
648        (scheduler)(&mut scope);
649        let spawned = mem::take(&mut scope.futures)
650            .into_iter()
651            .map(|f| self.spawn(f))
652            .collect::<Vec<_>>();
653        for task in spawned {
654            task.await;
655        }
656    }
657
658    pub fn timer(&self, duration: Duration) -> Timer {
659        match self {
660            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
661            #[cfg(any(test, feature = "test-support"))]
662            Background::Deterministic { executor } => executor.timer(duration),
663        }
664    }
665
666    pub fn now(&self) -> std::time::Instant {
667        match self {
668            Background::Production { .. } => std::time::Instant::now(),
669            #[cfg(any(test, feature = "test-support"))]
670            Background::Deterministic { executor } => executor.now(),
671        }
672    }
673
674    #[cfg(any(test, feature = "test-support"))]
675    pub async fn simulate_random_delay(&self) {
676        match self {
677            Self::Deterministic { executor, .. } => {
678                executor.simulate_random_delay().await;
679            }
680            _ => {
681                panic!("this method can only be called on a deterministic executor")
682            }
683        }
684    }
685}
686
687impl Default for Background {
688    fn default() -> Self {
689        Self::new()
690    }
691}
692
693pub struct Scope<'a> {
694    executor: Arc<Background>,
695    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
696    tx: Option<mpsc::Sender<()>>,
697    rx: mpsc::Receiver<()>,
698    _phantom: PhantomData<&'a ()>,
699}
700
701impl<'a> Scope<'a> {
702    fn new(executor: Arc<Background>) -> Self {
703        let (tx, rx) = mpsc::channel(1);
704        Self {
705            executor,
706            tx: Some(tx),
707            rx,
708            futures: Default::default(),
709            _phantom: PhantomData,
710        }
711    }
712
713    pub fn spawn<F>(&mut self, f: F)
714    where
715        F: Future<Output = ()> + Send + 'a,
716    {
717        let tx = self.tx.clone().unwrap();
718
719        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
720        // dropping this `Scope` blocks until all of the futures have resolved.
721        let f = unsafe {
722            mem::transmute::<
723                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
724                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
725            >(Box::pin(async move {
726                f.await;
727                drop(tx);
728            }))
729        };
730        self.futures.push(f);
731    }
732}
733
734impl<'a> Drop for Scope<'a> {
735    fn drop(&mut self) {
736        self.tx.take().unwrap();
737
738        // Wait until the channel is closed, which means that all of the spawned
739        // futures have resolved.
740        self.executor.block(self.rx.next());
741    }
742}
743
744impl<T> Task<T> {
745    pub fn ready(value: T) -> Self {
746        Self::Ready(Some(value))
747    }
748
749    fn local(any_task: AnyLocalTask) -> Self {
750        Self::Local {
751            any_task,
752            result_type: PhantomData,
753        }
754    }
755
756    pub fn detach(self) {
757        match self {
758            Task::Ready(_) => {}
759            Task::Local { any_task, .. } => any_task.detach(),
760            Task::Send { any_task, .. } => any_task.detach(),
761        }
762    }
763}
764
765impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
766    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
767        cx.spawn(|_| async move {
768            if let Err(err) = self.await {
769                log::error!("{}", err);
770            }
771        })
772        .detach();
773    }
774}
775
776impl<T: Send> Task<T> {
777    fn send(any_task: AnyTask) -> Self {
778        Self::Send {
779            any_task,
780            result_type: PhantomData,
781        }
782    }
783}
784
785impl<T: fmt::Debug> fmt::Debug for Task<T> {
786    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
787        match self {
788            Task::Ready(value) => value.fmt(f),
789            Task::Local { any_task, .. } => any_task.fmt(f),
790            Task::Send { any_task, .. } => any_task.fmt(f),
791        }
792    }
793}
794
795impl<T: 'static> Future for Task<T> {
796    type Output = T;
797
798    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
799        match unsafe { self.get_unchecked_mut() } {
800            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
801            Task::Local { any_task, .. } => {
802                any_task.poll(cx).map(|value| *value.downcast().unwrap())
803            }
804            Task::Send { any_task, .. } => {
805                any_task.poll(cx).map(|value| *value.downcast().unwrap())
806            }
807        }
808    }
809}
810
811fn any_future<T, F>(future: F) -> AnyFuture
812where
813    T: 'static + Send,
814    F: Future<Output = T> + Send + 'static,
815{
816    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
817}
818
819fn any_local_future<T, F>(future: F) -> AnyLocalFuture
820where
821    T: 'static,
822    F: Future<Output = T> + 'static,
823{
824    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
825}