executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<BackgroundRunnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76    next_runnable_id: usize,
 77    poll_history: Vec<usize>,
 78    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 79}
 80
 81#[cfg(any(test, feature = "test-support"))]
 82struct ForegroundRunnable {
 83    id: usize,
 84    runnable: Runnable,
 85    main: bool,
 86}
 87
 88#[cfg(any(test, feature = "test-support"))]
 89struct BackgroundRunnable {
 90    id: usize,
 91    runnable: Runnable,
 92}
 93
 94#[cfg(any(test, feature = "test-support"))]
 95pub struct Deterministic {
 96    state: Arc<parking_lot::Mutex<DeterministicState>>,
 97    parker: parking_lot::Mutex<parking::Parker>,
 98}
 99
100pub enum Timer {
101    Production(smol::Timer),
102    #[cfg(any(test, feature = "test-support"))]
103    Deterministic(DeterministicTimer),
104}
105
106#[cfg(any(test, feature = "test-support"))]
107pub struct DeterministicTimer {
108    rx: postage::barrier::Receiver,
109    id: usize,
110    state: Arc<parking_lot::Mutex<DeterministicState>>,
111}
112
113#[cfg(any(test, feature = "test-support"))]
114impl Deterministic {
115    pub fn new(seed: u64) -> Arc<Self> {
116        use rand::prelude::*;
117
118        Arc::new(Self {
119            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
120                rng: StdRng::seed_from_u64(seed),
121                seed,
122                scheduled_from_foreground: Default::default(),
123                scheduled_from_background: Default::default(),
124                forbid_parking: false,
125                block_on_ticks: 0..=1000,
126                now: std::time::Instant::now(),
127                next_timer_id: Default::default(),
128                pending_timers: Default::default(),
129                waiting_backtrace: None,
130                next_runnable_id: 0,
131                poll_history: Default::default(),
132                runnable_backtraces: Default::default(),
133            })),
134            parker: Default::default(),
135        })
136    }
137
138    pub fn runnable_history(&self) -> Vec<usize> {
139        self.state.lock().poll_history.clone()
140    }
141
142    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
143        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
144        backtrace.resolve();
145        backtrace
146    }
147
148    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
149        Arc::new(Background::Deterministic {
150            executor: self.clone(),
151        })
152    }
153
154    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
155        Rc::new(Foreground::Deterministic {
156            cx_id: id,
157            executor: self.clone(),
158        })
159    }
160
161    fn spawn_from_foreground(
162        &self,
163        cx_id: usize,
164        future: AnyLocalFuture,
165        main: bool,
166    ) -> AnyLocalTask {
167        let state = self.state.clone();
168        let id;
169        {
170            let mut state = state.lock();
171            id = util::post_inc(&mut state.next_runnable_id);
172            state
173                .runnable_backtraces
174                .insert(id, backtrace::Backtrace::new_unresolved());
175        }
176
177        let unparker = self.parker.lock().unparker();
178        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
179            let mut state = state.lock();
180            state
181                .scheduled_from_foreground
182                .entry(cx_id)
183                .or_default()
184                .push(ForegroundRunnable { id, runnable, main });
185            unparker.unpark();
186        });
187        runnable.schedule();
188        task
189    }
190
191    fn spawn(&self, future: AnyFuture) -> AnyTask {
192        let state = self.state.clone();
193        let id;
194        {
195            let mut state = state.lock();
196            id = util::post_inc(&mut state.next_runnable_id);
197            state
198                .runnable_backtraces
199                .insert(id, backtrace::Backtrace::new_unresolved());
200        }
201
202        let unparker = self.parker.lock().unparker();
203        let (runnable, task) = async_task::spawn(future, move |runnable| {
204            let mut state = state.lock();
205            state
206                .scheduled_from_background
207                .push(BackgroundRunnable { id, runnable });
208            unparker.unpark();
209        });
210        runnable.schedule();
211        task
212    }
213
214    fn run<'a>(
215        &self,
216        cx_id: usize,
217        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
218    ) -> Box<dyn Any> {
219        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
220
221        let woken = Arc::new(AtomicBool::new(false));
222
223        let state = self.state.clone();
224        let id;
225        {
226            let mut state = state.lock();
227            id = util::post_inc(&mut state.next_runnable_id);
228            state
229                .runnable_backtraces
230                .insert(id, backtrace::Backtrace::new());
231        }
232
233        let unparker = self.parker.lock().unparker();
234        let (runnable, mut main_task) = unsafe {
235            async_task::spawn_unchecked(main_future, move |runnable| {
236                let state = &mut *state.lock();
237                state
238                    .scheduled_from_foreground
239                    .entry(cx_id)
240                    .or_default()
241                    .push(ForegroundRunnable {
242                        id: util::post_inc(&mut state.next_runnable_id),
243                        runnable,
244                        main: true,
245                    });
246                unparker.unpark();
247            })
248        };
249        runnable.schedule();
250
251        loop {
252            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
253                return result;
254            }
255
256            if !woken.load(SeqCst) {
257                self.state.lock().will_park();
258            }
259
260            woken.store(false, SeqCst);
261            self.parker.lock().park();
262        }
263    }
264
265    pub fn run_until_parked(&self) {
266        use std::sync::atomic::AtomicBool;
267        let woken = Arc::new(AtomicBool::new(false));
268        self.run_internal(woken, None);
269    }
270
271    fn run_internal(
272        &self,
273        woken: Arc<std::sync::atomic::AtomicBool>,
274        mut main_task: Option<&mut AnyLocalTask>,
275    ) -> Option<Box<dyn Any>> {
276        use rand::prelude::*;
277        use std::sync::atomic::Ordering::SeqCst;
278
279        let unparker = self.parker.lock().unparker();
280        let waker = waker_fn::waker_fn(move || {
281            woken.store(true, SeqCst);
282            unparker.unpark();
283        });
284
285        let mut cx = Context::from_waker(&waker);
286        loop {
287            let mut state = self.state.lock();
288
289            if state.scheduled_from_foreground.is_empty()
290                && state.scheduled_from_background.is_empty()
291            {
292                if let Some(main_task) = main_task {
293                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
294                        return Some(result);
295                    }
296                }
297
298                return None;
299            }
300
301            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
302                let background_len = state.scheduled_from_background.len();
303                let ix = state.rng.gen_range(0..background_len);
304                let background_runnable = state.scheduled_from_background.remove(ix);
305                state.poll_history.push(background_runnable.id);
306                drop(state);
307                background_runnable.runnable.run();
308            } else if !state.scheduled_from_foreground.is_empty() {
309                let available_cx_ids = state
310                    .scheduled_from_foreground
311                    .keys()
312                    .copied()
313                    .collect::<Vec<_>>();
314                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
315                let scheduled_from_cx = state
316                    .scheduled_from_foreground
317                    .get_mut(&cx_id_to_run)
318                    .unwrap();
319                let foreground_runnable = scheduled_from_cx.remove(0);
320                if scheduled_from_cx.is_empty() {
321                    state.scheduled_from_foreground.remove(&cx_id_to_run);
322                }
323                state.poll_history.push(foreground_runnable.id);
324
325                drop(state);
326
327                foreground_runnable.runnable.run();
328                if let Some(main_task) = main_task.as_mut() {
329                    if foreground_runnable.main {
330                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
331                            return Some(result);
332                        }
333                    }
334                }
335            }
336        }
337    }
338
339    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
340    where
341        F: Unpin + Future<Output = T>,
342    {
343        use rand::prelude::*;
344
345        let unparker = self.parker.lock().unparker();
346        let waker = waker_fn::waker_fn(move || {
347            unparker.unpark();
348        });
349
350        let mut cx = Context::from_waker(&waker);
351        for _ in 0..max_ticks {
352            let mut state = self.state.lock();
353            let runnable_count = state.scheduled_from_background.len();
354            let ix = state.rng.gen_range(0..=runnable_count);
355            if ix < state.scheduled_from_background.len() {
356                let background_runnable = state.scheduled_from_background.remove(ix);
357                state.poll_history.push(background_runnable.id);
358                drop(state);
359                background_runnable.runnable.run();
360            } else {
361                drop(state);
362                if let Poll::Ready(result) = future.poll(&mut cx) {
363                    return Some(result);
364                }
365                let mut state = self.state.lock();
366                if state.scheduled_from_background.is_empty() {
367                    state.will_park();
368                    drop(state);
369                    self.parker.lock().park();
370                }
371
372                continue;
373            }
374        }
375
376        None
377    }
378
379    pub fn timer(&self, duration: Duration) -> Timer {
380        let (tx, rx) = postage::barrier::channel();
381        let mut state = self.state.lock();
382        let wakeup_at = state.now + duration;
383        let id = util::post_inc(&mut state.next_timer_id);
384        match state
385            .pending_timers
386            .binary_search_by_key(&wakeup_at, |e| e.1)
387        {
388            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
389        }
390        let state = self.state.clone();
391        Timer::Deterministic(DeterministicTimer { rx, id, state })
392    }
393
394    pub fn now(&self) -> std::time::Instant {
395        let state = self.state.lock();
396        state.now
397    }
398
399    pub fn advance_clock(&self, duration: Duration) {
400        let new_now = self.state.lock().now + duration;
401        loop {
402            self.run_until_parked();
403            let mut state = self.state.lock();
404
405            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
406                let wakeup_time = *wakeup_time;
407                if wakeup_time <= new_now {
408                    let timer_count = state
409                        .pending_timers
410                        .iter()
411                        .take_while(|(_, t, _)| *t == wakeup_time)
412                        .count();
413                    state.now = wakeup_time;
414                    let timers_to_wake = state
415                        .pending_timers
416                        .drain(0..timer_count)
417                        .collect::<Vec<_>>();
418                    drop(state);
419                    drop(timers_to_wake);
420                    continue;
421                }
422            }
423
424            break;
425        }
426
427        self.state.lock().now = new_now;
428    }
429
430    pub fn start_waiting(&self) {
431        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
432    }
433
434    pub fn finish_waiting(&self) {
435        self.state.lock().waiting_backtrace.take();
436    }
437
438    pub fn forbid_parking(&self) {
439        use rand::prelude::*;
440
441        let mut state = self.state.lock();
442        state.forbid_parking = true;
443        state.rng = StdRng::seed_from_u64(state.seed);
444    }
445
446    pub async fn simulate_random_delay(&self) {
447        use rand::prelude::*;
448        use smol::future::yield_now;
449        if self.state.lock().rng.gen_bool(0.2) {
450            let yields = self.state.lock().rng.gen_range(1..=10);
451            for _ in 0..yields {
452                yield_now().await;
453            }
454        }
455    }
456}
457
458impl Drop for Timer {
459    fn drop(&mut self) {
460        #[cfg(any(test, feature = "test-support"))]
461        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
462            state
463                .lock()
464                .pending_timers
465                .retain(|(timer_id, _, _)| timer_id != id)
466        }
467    }
468}
469
470impl Future for Timer {
471    type Output = ();
472
473    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
474        match &mut *self {
475            #[cfg(any(test, feature = "test-support"))]
476            Self::Deterministic(DeterministicTimer { rx, .. }) => {
477                use postage::stream::{PollRecv, Stream as _};
478                smol::pin!(rx);
479                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
480                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
481                    PollRecv::Pending => Poll::Pending,
482                }
483            }
484            Self::Production(timer) => {
485                smol::pin!(timer);
486                match timer.poll(cx) {
487                    Poll::Ready(_) => Poll::Ready(()),
488                    Poll::Pending => Poll::Pending,
489                }
490            }
491        }
492    }
493}
494
495#[cfg(any(test, feature = "test-support"))]
496impl DeterministicState {
497    fn will_park(&mut self) {
498        if self.forbid_parking {
499            let mut backtrace_message = String::new();
500            #[cfg(any(test, feature = "test-support"))]
501            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
502                backtrace.resolve();
503                backtrace_message = format!(
504                    "\nbacktrace of waiting future:\n{:?}",
505                    util::CwdBacktrace(backtrace)
506                );
507            }
508
509            panic!(
510                "deterministic executor parked after a call to forbid_parking{}",
511                backtrace_message
512            );
513        }
514    }
515}
516
517impl Foreground {
518    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
519        if dispatcher.is_main_thread() {
520            Ok(Self::Platform {
521                dispatcher,
522                _not_send_or_sync: PhantomData,
523            })
524        } else {
525            Err(anyhow!("must be constructed on main thread"))
526        }
527    }
528
529    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
530        let future = any_local_future(future);
531        let any_task = match self {
532            #[cfg(any(test, feature = "test-support"))]
533            Self::Deterministic { cx_id, executor } => {
534                executor.spawn_from_foreground(*cx_id, future, false)
535            }
536            Self::Platform { dispatcher, .. } => {
537                fn spawn_inner(
538                    future: AnyLocalFuture,
539                    dispatcher: &Arc<dyn Dispatcher>,
540                ) -> AnyLocalTask {
541                    let dispatcher = dispatcher.clone();
542                    let schedule =
543                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
544                    let (runnable, task) = async_task::spawn_local(future, schedule);
545                    runnable.schedule();
546                    task
547                }
548                spawn_inner(future, dispatcher)
549            }
550        };
551        Task::local(any_task)
552    }
553
554    #[cfg(any(test, feature = "test-support"))]
555    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
556        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
557        let result = match self {
558            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
559            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
560        };
561        *result.downcast().unwrap()
562    }
563
564    #[cfg(any(test, feature = "test-support"))]
565    pub fn run_until_parked(&self) {
566        match self {
567            Self::Deterministic { executor, .. } => executor.run_until_parked(),
568            _ => panic!("this method can only be called on a deterministic executor"),
569        }
570    }
571
572    #[cfg(any(test, feature = "test-support"))]
573    pub fn parking_forbidden(&self) -> bool {
574        match self {
575            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
576            _ => panic!("this method can only be called on a deterministic executor"),
577        }
578    }
579
580    #[cfg(any(test, feature = "test-support"))]
581    pub fn start_waiting(&self) {
582        match self {
583            Self::Deterministic { executor, .. } => executor.start_waiting(),
584            _ => panic!("this method can only be called on a deterministic executor"),
585        }
586    }
587
588    #[cfg(any(test, feature = "test-support"))]
589    pub fn finish_waiting(&self) {
590        match self {
591            Self::Deterministic { executor, .. } => executor.finish_waiting(),
592            _ => panic!("this method can only be called on a deterministic executor"),
593        }
594    }
595
596    #[cfg(any(test, feature = "test-support"))]
597    pub fn forbid_parking(&self) {
598        match self {
599            Self::Deterministic { executor, .. } => executor.forbid_parking(),
600            _ => panic!("this method can only be called on a deterministic executor"),
601        }
602    }
603
604    #[cfg(any(test, feature = "test-support"))]
605    pub fn advance_clock(&self, duration: Duration) {
606        match self {
607            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
608            _ => panic!("this method can only be called on a deterministic executor"),
609        }
610    }
611
612    #[cfg(any(test, feature = "test-support"))]
613    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
614        match self {
615            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
616            _ => panic!("this method can only be called on a deterministic executor"),
617        }
618    }
619}
620
621impl Background {
622    pub fn new() -> Self {
623        let executor = Arc::new(Executor::new());
624        let stop = channel::unbounded::<()>();
625
626        for i in 0..2 * num_cpus::get() {
627            let executor = executor.clone();
628            let stop = stop.1.clone();
629            thread::Builder::new()
630                .name(format!("background-executor-{}", i))
631                .spawn(move || smol::block_on(executor.run(stop.recv())))
632                .unwrap();
633        }
634
635        Self::Production {
636            executor,
637            _stop: stop.0,
638        }
639    }
640
641    pub fn num_cpus(&self) -> usize {
642        num_cpus::get()
643    }
644
645    pub fn spawn<T, F>(&self, future: F) -> Task<T>
646    where
647        T: 'static + Send,
648        F: Send + Future<Output = T> + 'static,
649    {
650        let future = any_future(future);
651        let any_task = match self {
652            Self::Production { executor, .. } => executor.spawn(future),
653            #[cfg(any(test, feature = "test-support"))]
654            Self::Deterministic { executor } => executor.spawn(future),
655        };
656        Task::send(any_task)
657    }
658
659    pub fn block<F, T>(&self, future: F) -> T
660    where
661        F: Future<Output = T>,
662    {
663        smol::pin!(future);
664        match self {
665            Self::Production { .. } => smol::block_on(&mut future),
666            #[cfg(any(test, feature = "test-support"))]
667            Self::Deterministic { executor, .. } => {
668                executor.block(&mut future, usize::MAX).unwrap()
669            }
670        }
671    }
672
673    pub fn block_with_timeout<F, T>(
674        &self,
675        timeout: Duration,
676        future: F,
677    ) -> Result<T, impl Future<Output = T>>
678    where
679        T: 'static,
680        F: 'static + Unpin + Future<Output = T>,
681    {
682        let mut future = any_local_future(future);
683        if !timeout.is_zero() {
684            let output = match self {
685                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
686                #[cfg(any(test, feature = "test-support"))]
687                Self::Deterministic { executor, .. } => {
688                    use rand::prelude::*;
689                    let max_ticks = {
690                        let mut state = executor.state.lock();
691                        let range = state.block_on_ticks.clone();
692                        state.rng.gen_range(range)
693                    };
694                    executor.block(&mut future, max_ticks)
695                }
696            };
697            if let Some(output) = output {
698                return Ok(*output.downcast().unwrap());
699            }
700        }
701        Err(async { *future.await.downcast().unwrap() })
702    }
703
704    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
705    where
706        F: FnOnce(&mut Scope<'scope>),
707    {
708        let mut scope = Scope::new(self.clone());
709        (scheduler)(&mut scope);
710        let spawned = mem::take(&mut scope.futures)
711            .into_iter()
712            .map(|f| self.spawn(f))
713            .collect::<Vec<_>>();
714        for task in spawned {
715            task.await;
716        }
717    }
718
719    pub fn timer(&self, duration: Duration) -> Timer {
720        match self {
721            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
722            #[cfg(any(test, feature = "test-support"))]
723            Background::Deterministic { executor } => executor.timer(duration),
724        }
725    }
726
727    pub fn now(&self) -> std::time::Instant {
728        match self {
729            Background::Production { .. } => std::time::Instant::now(),
730            #[cfg(any(test, feature = "test-support"))]
731            Background::Deterministic { executor } => executor.now(),
732        }
733    }
734
735    #[cfg(any(test, feature = "test-support"))]
736    pub async fn simulate_random_delay(&self) {
737        match self {
738            Self::Deterministic { executor, .. } => {
739                executor.simulate_random_delay().await;
740            }
741            _ => {
742                panic!("this method can only be called on a deterministic executor")
743            }
744        }
745    }
746}
747
748impl Default for Background {
749    fn default() -> Self {
750        Self::new()
751    }
752}
753
754pub struct Scope<'a> {
755    executor: Arc<Background>,
756    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
757    tx: Option<mpsc::Sender<()>>,
758    rx: mpsc::Receiver<()>,
759    _phantom: PhantomData<&'a ()>,
760}
761
762impl<'a> Scope<'a> {
763    fn new(executor: Arc<Background>) -> Self {
764        let (tx, rx) = mpsc::channel(1);
765        Self {
766            executor,
767            tx: Some(tx),
768            rx,
769            futures: Default::default(),
770            _phantom: PhantomData,
771        }
772    }
773
774    pub fn spawn<F>(&mut self, f: F)
775    where
776        F: Future<Output = ()> + Send + 'a,
777    {
778        let tx = self.tx.clone().unwrap();
779
780        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
781        // dropping this `Scope` blocks until all of the futures have resolved.
782        let f = unsafe {
783            mem::transmute::<
784                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
785                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
786            >(Box::pin(async move {
787                f.await;
788                drop(tx);
789            }))
790        };
791        self.futures.push(f);
792    }
793}
794
795impl<'a> Drop for Scope<'a> {
796    fn drop(&mut self) {
797        self.tx.take().unwrap();
798
799        // Wait until the channel is closed, which means that all of the spawned
800        // futures have resolved.
801        self.executor.block(self.rx.next());
802    }
803}
804
805impl<T> Task<T> {
806    pub fn ready(value: T) -> Self {
807        Self::Ready(Some(value))
808    }
809
810    fn local(any_task: AnyLocalTask) -> Self {
811        Self::Local {
812            any_task,
813            result_type: PhantomData,
814        }
815    }
816
817    pub fn detach(self) {
818        match self {
819            Task::Ready(_) => {}
820            Task::Local { any_task, .. } => any_task.detach(),
821            Task::Send { any_task, .. } => any_task.detach(),
822        }
823    }
824}
825
826impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
827    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
828        cx.spawn(|_| async move {
829            if let Err(err) = self.await {
830                log::error!("{}", err);
831            }
832        })
833        .detach();
834    }
835}
836
837impl<T: Send> Task<T> {
838    fn send(any_task: AnyTask) -> Self {
839        Self::Send {
840            any_task,
841            result_type: PhantomData,
842        }
843    }
844}
845
846impl<T: fmt::Debug> fmt::Debug for Task<T> {
847    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
848        match self {
849            Task::Ready(value) => value.fmt(f),
850            Task::Local { any_task, .. } => any_task.fmt(f),
851            Task::Send { any_task, .. } => any_task.fmt(f),
852        }
853    }
854}
855
856impl<T: 'static> Future for Task<T> {
857    type Output = T;
858
859    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
860        match unsafe { self.get_unchecked_mut() } {
861            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
862            Task::Local { any_task, .. } => {
863                any_task.poll(cx).map(|value| *value.downcast().unwrap())
864            }
865            Task::Send { any_task, .. } => {
866                any_task.poll(cx).map(|value| *value.downcast().unwrap())
867            }
868        }
869    }
870}
871
872fn any_future<T, F>(future: F) -> AnyFuture
873where
874    T: 'static + Send,
875    F: Future<Output = T> + Send + 'static,
876{
877    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
878}
879
880fn any_local_future<T, F>(future: F) -> AnyLocalFuture
881where
882    T: 'static,
883    F: Future<Output = T> + 'static,
884{
885    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
886}