executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<Runnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76}
 77
 78#[cfg(any(test, feature = "test-support"))]
 79struct ForegroundRunnable {
 80    runnable: Runnable,
 81    main: bool,
 82}
 83
 84#[cfg(any(test, feature = "test-support"))]
 85pub struct Deterministic {
 86    state: Arc<parking_lot::Mutex<DeterministicState>>,
 87    parker: parking_lot::Mutex<parking::Parker>,
 88}
 89
 90pub enum Timer {
 91    Production(smol::Timer),
 92    #[cfg(any(test, feature = "test-support"))]
 93    Deterministic(DeterministicTimer),
 94}
 95
 96#[cfg(any(test, feature = "test-support"))]
 97pub struct DeterministicTimer {
 98    rx: postage::barrier::Receiver,
 99    id: usize,
100    state: Arc<parking_lot::Mutex<DeterministicState>>,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104impl Deterministic {
105    pub fn new(seed: u64) -> Arc<Self> {
106        use rand::prelude::*;
107
108        Arc::new(Self {
109            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
110                rng: StdRng::seed_from_u64(seed),
111                seed,
112                scheduled_from_foreground: Default::default(),
113                scheduled_from_background: Default::default(),
114                forbid_parking: false,
115                block_on_ticks: 0..=1000,
116                now: std::time::Instant::now(),
117                next_timer_id: Default::default(),
118                pending_timers: Default::default(),
119                waiting_backtrace: None,
120            })),
121            parker: Default::default(),
122        })
123    }
124
125    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
126        Arc::new(Background::Deterministic {
127            executor: self.clone(),
128        })
129    }
130
131    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
132        Rc::new(Foreground::Deterministic {
133            cx_id: id,
134            executor: self.clone(),
135        })
136    }
137
138    fn spawn_from_foreground(
139        &self,
140        cx_id: usize,
141        future: AnyLocalFuture,
142        main: bool,
143    ) -> AnyLocalTask {
144        let state = self.state.clone();
145        let unparker = self.parker.lock().unparker();
146        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
147            let mut state = state.lock();
148            state
149                .scheduled_from_foreground
150                .entry(cx_id)
151                .or_default()
152                .push(ForegroundRunnable { runnable, main });
153            unparker.unpark();
154        });
155        runnable.schedule();
156        task
157    }
158
159    fn spawn(&self, future: AnyFuture) -> AnyTask {
160        let state = self.state.clone();
161        let unparker = self.parker.lock().unparker();
162        let (runnable, task) = async_task::spawn(future, move |runnable| {
163            let mut state = state.lock();
164            state.scheduled_from_background.push(runnable);
165            unparker.unpark();
166        });
167        runnable.schedule();
168        task
169    }
170
171    fn run<'a>(
172        &self,
173        cx_id: usize,
174        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
175    ) -> Box<dyn Any> {
176        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
177
178        let woken = Arc::new(AtomicBool::new(false));
179
180        let state = self.state.clone();
181        let unparker = self.parker.lock().unparker();
182        let (runnable, mut main_task) = unsafe {
183            async_task::spawn_unchecked(main_future, move |runnable| {
184                let mut state = state.lock();
185                state
186                    .scheduled_from_foreground
187                    .entry(cx_id)
188                    .or_default()
189                    .push(ForegroundRunnable {
190                        runnable,
191                        main: true,
192                    });
193                unparker.unpark();
194            })
195        };
196        runnable.schedule();
197
198        loop {
199            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
200                return result;
201            }
202
203            if !woken.load(SeqCst) {
204                self.state.lock().will_park();
205            }
206
207            woken.store(false, SeqCst);
208            self.parker.lock().park();
209        }
210    }
211
212    pub fn run_until_parked(&self) {
213        use std::sync::atomic::AtomicBool;
214        let woken = Arc::new(AtomicBool::new(false));
215        self.run_internal(woken, None);
216    }
217
218    fn run_internal(
219        &self,
220        woken: Arc<std::sync::atomic::AtomicBool>,
221        mut main_task: Option<&mut AnyLocalTask>,
222    ) -> Option<Box<dyn Any>> {
223        use rand::prelude::*;
224        use std::sync::atomic::Ordering::SeqCst;
225
226        let unparker = self.parker.lock().unparker();
227        let waker = waker_fn::waker_fn(move || {
228            woken.store(true, SeqCst);
229            unparker.unpark();
230        });
231
232        let mut cx = Context::from_waker(&waker);
233        loop {
234            let mut state = self.state.lock();
235
236            if state.scheduled_from_foreground.is_empty()
237                && state.scheduled_from_background.is_empty()
238            {
239                if let Some(main_task) = main_task {
240                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
241                        return Some(result);
242                    }
243                }
244
245                return None;
246            }
247
248            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
249                let background_len = state.scheduled_from_background.len();
250                let ix = state.rng.gen_range(0..background_len);
251                let runnable = state.scheduled_from_background.remove(ix);
252                drop(state);
253                runnable.run();
254            } else if !state.scheduled_from_foreground.is_empty() {
255                let available_cx_ids = state
256                    .scheduled_from_foreground
257                    .keys()
258                    .copied()
259                    .collect::<Vec<_>>();
260                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
261                let scheduled_from_cx = state
262                    .scheduled_from_foreground
263                    .get_mut(&cx_id_to_run)
264                    .unwrap();
265                let foreground_runnable = scheduled_from_cx.remove(0);
266                if scheduled_from_cx.is_empty() {
267                    state.scheduled_from_foreground.remove(&cx_id_to_run);
268                }
269
270                drop(state);
271
272                foreground_runnable.runnable.run();
273                if let Some(main_task) = main_task.as_mut() {
274                    if foreground_runnable.main {
275                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
276                            return Some(result);
277                        }
278                    }
279                }
280            }
281        }
282    }
283
284    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
285    where
286        F: Unpin + Future<Output = T>,
287    {
288        use rand::prelude::*;
289
290        let unparker = self.parker.lock().unparker();
291        let waker = waker_fn::waker_fn(move || {
292            unparker.unpark();
293        });
294
295        let mut cx = Context::from_waker(&waker);
296        for _ in 0..max_ticks {
297            let mut state = self.state.lock();
298            let runnable_count = state.scheduled_from_background.len();
299            let ix = state.rng.gen_range(0..=runnable_count);
300            if ix < state.scheduled_from_background.len() {
301                let runnable = state.scheduled_from_background.remove(ix);
302                drop(state);
303                runnable.run();
304            } else {
305                drop(state);
306                if let Poll::Ready(result) = future.poll(&mut cx) {
307                    return Some(result);
308                }
309                let mut state = self.state.lock();
310                if state.scheduled_from_background.is_empty() {
311                    state.will_park();
312                    drop(state);
313                    self.parker.lock().park();
314                }
315
316                continue;
317            }
318        }
319
320        None
321    }
322
323    pub fn timer(&self, duration: Duration) -> Timer {
324        let (tx, rx) = postage::barrier::channel();
325        let mut state = self.state.lock();
326        let wakeup_at = state.now + duration;
327        let id = util::post_inc(&mut state.next_timer_id);
328        match state
329            .pending_timers
330            .binary_search_by_key(&wakeup_at, |e| e.1)
331        {
332            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
333        }
334        let state = self.state.clone();
335        Timer::Deterministic(DeterministicTimer { rx, id, state })
336    }
337
338    pub fn now(&self) -> std::time::Instant {
339        let state = self.state.lock();
340        state.now
341    }
342
343    pub fn advance_clock(&self, duration: Duration) {
344        let new_now = self.state.lock().now + duration;
345        loop {
346            self.run_until_parked();
347            let mut state = self.state.lock();
348
349            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
350                let wakeup_time = *wakeup_time;
351                if wakeup_time <= new_now {
352                    let timer_count = state
353                        .pending_timers
354                        .iter()
355                        .take_while(|(_, t, _)| *t == wakeup_time)
356                        .count();
357                    state.now = wakeup_time;
358                    let timers_to_wake = state
359                        .pending_timers
360                        .drain(0..timer_count)
361                        .collect::<Vec<_>>();
362                    drop(state);
363                    drop(timers_to_wake);
364                    continue;
365                }
366            }
367
368            break;
369        }
370
371        self.state.lock().now = new_now;
372    }
373
374    pub fn start_waiting(&self) {
375        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
376    }
377
378    pub fn finish_waiting(&self) {
379        self.state.lock().waiting_backtrace.take();
380    }
381
382    pub fn forbid_parking(&self) {
383        use rand::prelude::*;
384
385        let mut state = self.state.lock();
386        state.forbid_parking = true;
387        state.rng = StdRng::seed_from_u64(state.seed);
388    }
389
390    pub async fn simulate_random_delay(&self) {
391        use rand::prelude::*;
392        use smol::future::yield_now;
393        if self.state.lock().rng.gen_bool(0.2) {
394            let yields = self.state.lock().rng.gen_range(1..=10);
395            for _ in 0..yields {
396                yield_now().await;
397            }
398        }
399    }
400}
401
402impl Drop for Timer {
403    fn drop(&mut self) {
404        #[cfg(any(test, feature = "test-support"))]
405        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
406            state
407                .lock()
408                .pending_timers
409                .retain(|(timer_id, _, _)| timer_id != id)
410        }
411    }
412}
413
414impl Future for Timer {
415    type Output = ();
416
417    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
418        match &mut *self {
419            #[cfg(any(test, feature = "test-support"))]
420            Self::Deterministic(DeterministicTimer { rx, .. }) => {
421                use postage::stream::{PollRecv, Stream as _};
422                smol::pin!(rx);
423                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
424                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
425                    PollRecv::Pending => Poll::Pending,
426                }
427            }
428            Self::Production(timer) => {
429                smol::pin!(timer);
430                match timer.poll(cx) {
431                    Poll::Ready(_) => Poll::Ready(()),
432                    Poll::Pending => Poll::Pending,
433                }
434            }
435        }
436    }
437}
438
439#[cfg(any(test, feature = "test-support"))]
440impl DeterministicState {
441    fn will_park(&mut self) {
442        if self.forbid_parking {
443            let mut backtrace_message = String::new();
444            #[cfg(any(test, feature = "test-support"))]
445            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
446                backtrace.resolve();
447                backtrace_message = format!(
448                    "\nbacktrace of waiting future:\n{:?}",
449                    util::CwdBacktrace(backtrace)
450                );
451            }
452
453            panic!(
454                "deterministic executor parked after a call to forbid_parking{}",
455                backtrace_message
456            );
457        }
458    }
459}
460
461impl Foreground {
462    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
463        if dispatcher.is_main_thread() {
464            Ok(Self::Platform {
465                dispatcher,
466                _not_send_or_sync: PhantomData,
467            })
468        } else {
469            Err(anyhow!("must be constructed on main thread"))
470        }
471    }
472
473    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
474        let future = any_local_future(future);
475        let any_task = match self {
476            #[cfg(any(test, feature = "test-support"))]
477            Self::Deterministic { cx_id, executor } => {
478                executor.spawn_from_foreground(*cx_id, future, false)
479            }
480            Self::Platform { dispatcher, .. } => {
481                fn spawn_inner(
482                    future: AnyLocalFuture,
483                    dispatcher: &Arc<dyn Dispatcher>,
484                ) -> AnyLocalTask {
485                    let dispatcher = dispatcher.clone();
486                    let schedule =
487                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
488                    let (runnable, task) = async_task::spawn_local(future, schedule);
489                    runnable.schedule();
490                    task
491                }
492                spawn_inner(future, dispatcher)
493            }
494        };
495        Task::local(any_task)
496    }
497
498    #[cfg(any(test, feature = "test-support"))]
499    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
500        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
501        let result = match self {
502            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
503            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
504        };
505        *result.downcast().unwrap()
506    }
507
508    #[cfg(any(test, feature = "test-support"))]
509    pub fn run_until_parked(&self) {
510        match self {
511            Self::Deterministic { executor, .. } => executor.run_until_parked(),
512            _ => panic!("this method can only be called on a deterministic executor"),
513        }
514    }
515
516    #[cfg(any(test, feature = "test-support"))]
517    pub fn parking_forbidden(&self) -> bool {
518        match self {
519            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
520            _ => panic!("this method can only be called on a deterministic executor"),
521        }
522    }
523
524    #[cfg(any(test, feature = "test-support"))]
525    pub fn start_waiting(&self) {
526        match self {
527            Self::Deterministic { executor, .. } => executor.start_waiting(),
528            _ => panic!("this method can only be called on a deterministic executor"),
529        }
530    }
531
532    #[cfg(any(test, feature = "test-support"))]
533    pub fn finish_waiting(&self) {
534        match self {
535            Self::Deterministic { executor, .. } => executor.finish_waiting(),
536            _ => panic!("this method can only be called on a deterministic executor"),
537        }
538    }
539
540    #[cfg(any(test, feature = "test-support"))]
541    pub fn forbid_parking(&self) {
542        match self {
543            Self::Deterministic { executor, .. } => executor.forbid_parking(),
544            _ => panic!("this method can only be called on a deterministic executor"),
545        }
546    }
547
548    #[cfg(any(test, feature = "test-support"))]
549    pub fn advance_clock(&self, duration: Duration) {
550        match self {
551            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
552            _ => panic!("this method can only be called on a deterministic executor"),
553        }
554    }
555
556    #[cfg(any(test, feature = "test-support"))]
557    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
558        match self {
559            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
560            _ => panic!("this method can only be called on a deterministic executor"),
561        }
562    }
563}
564
565impl Background {
566    pub fn new() -> Self {
567        let executor = Arc::new(Executor::new());
568        let stop = channel::unbounded::<()>();
569
570        for i in 0..2 * num_cpus::get() {
571            let executor = executor.clone();
572            let stop = stop.1.clone();
573            thread::Builder::new()
574                .name(format!("background-executor-{}", i))
575                .spawn(move || smol::block_on(executor.run(stop.recv())))
576                .unwrap();
577        }
578
579        Self::Production {
580            executor,
581            _stop: stop.0,
582        }
583    }
584
585    pub fn num_cpus(&self) -> usize {
586        num_cpus::get()
587    }
588
589    pub fn spawn<T, F>(&self, future: F) -> Task<T>
590    where
591        T: 'static + Send,
592        F: Send + Future<Output = T> + 'static,
593    {
594        let future = any_future(future);
595        let any_task = match self {
596            Self::Production { executor, .. } => executor.spawn(future),
597            #[cfg(any(test, feature = "test-support"))]
598            Self::Deterministic { executor } => executor.spawn(future),
599        };
600        Task::send(any_task)
601    }
602
603    pub fn block<F, T>(&self, future: F) -> T
604    where
605        F: Future<Output = T>,
606    {
607        smol::pin!(future);
608        match self {
609            Self::Production { .. } => smol::block_on(&mut future),
610            #[cfg(any(test, feature = "test-support"))]
611            Self::Deterministic { executor, .. } => {
612                executor.block(&mut future, usize::MAX).unwrap()
613            }
614        }
615    }
616
617    pub fn block_with_timeout<F, T>(
618        &self,
619        timeout: Duration,
620        future: F,
621    ) -> Result<T, impl Future<Output = T>>
622    where
623        T: 'static,
624        F: 'static + Unpin + Future<Output = T>,
625    {
626        let mut future = any_local_future(future);
627        if !timeout.is_zero() {
628            let output = match self {
629                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
630                #[cfg(any(test, feature = "test-support"))]
631                Self::Deterministic { executor, .. } => {
632                    use rand::prelude::*;
633                    let max_ticks = {
634                        let mut state = executor.state.lock();
635                        let range = state.block_on_ticks.clone();
636                        state.rng.gen_range(range)
637                    };
638                    executor.block(&mut future, max_ticks)
639                }
640            };
641            if let Some(output) = output {
642                return Ok(*output.downcast().unwrap());
643            }
644        }
645        Err(async { *future.await.downcast().unwrap() })
646    }
647
648    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
649    where
650        F: FnOnce(&mut Scope<'scope>),
651    {
652        let mut scope = Scope::new(self.clone());
653        (scheduler)(&mut scope);
654        let spawned = mem::take(&mut scope.futures)
655            .into_iter()
656            .map(|f| self.spawn(f))
657            .collect::<Vec<_>>();
658        for task in spawned {
659            task.await;
660        }
661    }
662
663    pub fn timer(&self, duration: Duration) -> Timer {
664        match self {
665            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
666            #[cfg(any(test, feature = "test-support"))]
667            Background::Deterministic { executor } => executor.timer(duration),
668        }
669    }
670
671    pub fn now(&self) -> std::time::Instant {
672        match self {
673            Background::Production { .. } => std::time::Instant::now(),
674            #[cfg(any(test, feature = "test-support"))]
675            Background::Deterministic { executor } => executor.now(),
676        }
677    }
678
679    #[cfg(any(test, feature = "test-support"))]
680    pub async fn simulate_random_delay(&self) {
681        match self {
682            Self::Deterministic { executor, .. } => {
683                executor.simulate_random_delay().await;
684            }
685            _ => {
686                panic!("this method can only be called on a deterministic executor")
687            }
688        }
689    }
690}
691
692impl Default for Background {
693    fn default() -> Self {
694        Self::new()
695    }
696}
697
698pub struct Scope<'a> {
699    executor: Arc<Background>,
700    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
701    tx: Option<mpsc::Sender<()>>,
702    rx: mpsc::Receiver<()>,
703    _phantom: PhantomData<&'a ()>,
704}
705
706impl<'a> Scope<'a> {
707    fn new(executor: Arc<Background>) -> Self {
708        let (tx, rx) = mpsc::channel(1);
709        Self {
710            executor,
711            tx: Some(tx),
712            rx,
713            futures: Default::default(),
714            _phantom: PhantomData,
715        }
716    }
717
718    pub fn spawn<F>(&mut self, f: F)
719    where
720        F: Future<Output = ()> + Send + 'a,
721    {
722        let tx = self.tx.clone().unwrap();
723
724        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
725        // dropping this `Scope` blocks until all of the futures have resolved.
726        let f = unsafe {
727            mem::transmute::<
728                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
729                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
730            >(Box::pin(async move {
731                f.await;
732                drop(tx);
733            }))
734        };
735        self.futures.push(f);
736    }
737}
738
739impl<'a> Drop for Scope<'a> {
740    fn drop(&mut self) {
741        self.tx.take().unwrap();
742
743        // Wait until the channel is closed, which means that all of the spawned
744        // futures have resolved.
745        self.executor.block(self.rx.next());
746    }
747}
748
749impl<T> Task<T> {
750    pub fn ready(value: T) -> Self {
751        Self::Ready(Some(value))
752    }
753
754    fn local(any_task: AnyLocalTask) -> Self {
755        Self::Local {
756            any_task,
757            result_type: PhantomData,
758        }
759    }
760
761    pub fn detach(self) {
762        match self {
763            Task::Ready(_) => {}
764            Task::Local { any_task, .. } => any_task.detach(),
765            Task::Send { any_task, .. } => any_task.detach(),
766        }
767    }
768}
769
770impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
771    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
772        cx.spawn(|_| async move {
773            if let Err(err) = self.await {
774                log::error!("{}", err);
775            }
776        })
777        .detach();
778    }
779}
780
781impl<T: Send> Task<T> {
782    fn send(any_task: AnyTask) -> Self {
783        Self::Send {
784            any_task,
785            result_type: PhantomData,
786        }
787    }
788}
789
790impl<T: fmt::Debug> fmt::Debug for Task<T> {
791    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
792        match self {
793            Task::Ready(value) => value.fmt(f),
794            Task::Local { any_task, .. } => any_task.fmt(f),
795            Task::Send { any_task, .. } => any_task.fmt(f),
796        }
797    }
798}
799
800impl<T: 'static> Future for Task<T> {
801    type Output = T;
802
803    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
804        match unsafe { self.get_unchecked_mut() } {
805            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
806            Task::Local { any_task, .. } => {
807                any_task.poll(cx).map(|value| *value.downcast().unwrap())
808            }
809            Task::Send { any_task, .. } => {
810                any_task.poll(cx).map(|value| *value.downcast().unwrap())
811            }
812        }
813    }
814}
815
816fn any_future<T, F>(future: F) -> AnyFuture
817where
818    T: 'static + Send,
819    F: Future<Output = T> + Send + 'static,
820{
821    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
822}
823
824fn any_local_future<T, F>(future: F) -> AnyLocalFuture
825where
826    T: 'static,
827    F: Future<Output = T> + 'static,
828{
829    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
830}