executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<BackgroundRunnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76    next_runnable_id: usize,
 77    poll_history: Vec<usize>,
 78    enable_runnable_backtraces: bool,
 79    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 80}
 81
 82#[cfg(any(test, feature = "test-support"))]
 83struct ForegroundRunnable {
 84    id: usize,
 85    runnable: Runnable,
 86    main: bool,
 87}
 88
 89#[cfg(any(test, feature = "test-support"))]
 90struct BackgroundRunnable {
 91    id: usize,
 92    runnable: Runnable,
 93}
 94
 95#[cfg(any(test, feature = "test-support"))]
 96pub struct Deterministic {
 97    state: Arc<parking_lot::Mutex<DeterministicState>>,
 98    parker: parking_lot::Mutex<parking::Parker>,
 99}
100
101pub enum Timer {
102    Production(smol::Timer),
103    #[cfg(any(test, feature = "test-support"))]
104    Deterministic(DeterministicTimer),
105}
106
107#[cfg(any(test, feature = "test-support"))]
108pub struct DeterministicTimer {
109    rx: postage::barrier::Receiver,
110    id: usize,
111    state: Arc<parking_lot::Mutex<DeterministicState>>,
112}
113
114#[cfg(any(test, feature = "test-support"))]
115impl Deterministic {
116    pub fn new(seed: u64) -> Arc<Self> {
117        use rand::prelude::*;
118
119        Arc::new(Self {
120            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
121                rng: StdRng::seed_from_u64(seed),
122                seed,
123                scheduled_from_foreground: Default::default(),
124                scheduled_from_background: Default::default(),
125                forbid_parking: false,
126                block_on_ticks: 0..=1000,
127                now: std::time::Instant::now(),
128                next_timer_id: Default::default(),
129                pending_timers: Default::default(),
130                waiting_backtrace: None,
131                next_runnable_id: 0,
132                poll_history: Default::default(),
133                enable_runnable_backtraces: false,
134                runnable_backtraces: Default::default(),
135            })),
136            parker: Default::default(),
137        })
138    }
139
140    pub fn runnable_history(&self) -> Vec<usize> {
141        self.state.lock().poll_history.clone()
142    }
143
144    pub fn enable_runnable_backtrace(&self) {
145        self.state.lock().enable_runnable_backtraces = true;
146    }
147
148    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
149        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
150        backtrace.resolve();
151        backtrace
152    }
153
154    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
155        Arc::new(Background::Deterministic {
156            executor: self.clone(),
157        })
158    }
159
160    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
161        Rc::new(Foreground::Deterministic {
162            cx_id: id,
163            executor: self.clone(),
164        })
165    }
166
167    fn spawn_from_foreground(
168        &self,
169        cx_id: usize,
170        future: AnyLocalFuture,
171        main: bool,
172    ) -> AnyLocalTask {
173        let state = self.state.clone();
174        let id;
175        {
176            let mut state = state.lock();
177            id = util::post_inc(&mut state.next_runnable_id);
178            if state.enable_runnable_backtraces {
179                state
180                    .runnable_backtraces
181                    .insert(id, backtrace::Backtrace::new_unresolved());
182            }
183        }
184
185        let unparker = self.parker.lock().unparker();
186        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
187            let mut state = state.lock();
188            state
189                .scheduled_from_foreground
190                .entry(cx_id)
191                .or_default()
192                .push(ForegroundRunnable { id, runnable, main });
193            unparker.unpark();
194        });
195        runnable.schedule();
196        task
197    }
198
199    fn spawn(&self, future: AnyFuture) -> AnyTask {
200        let state = self.state.clone();
201        let id;
202        {
203            let mut state = state.lock();
204            id = util::post_inc(&mut state.next_runnable_id);
205            if state.enable_runnable_backtraces {
206                state
207                    .runnable_backtraces
208                    .insert(id, backtrace::Backtrace::new_unresolved());
209            }
210        }
211
212        let unparker = self.parker.lock().unparker();
213        let (runnable, task) = async_task::spawn(future, move |runnable| {
214            let mut state = state.lock();
215            state
216                .scheduled_from_background
217                .push(BackgroundRunnable { id, runnable });
218            unparker.unpark();
219        });
220        runnable.schedule();
221        task
222    }
223
224    fn run<'a>(
225        &self,
226        cx_id: usize,
227        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
228    ) -> Box<dyn Any> {
229        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
230
231        let woken = Arc::new(AtomicBool::new(false));
232
233        let state = self.state.clone();
234        let id;
235        {
236            let mut state = state.lock();
237            id = util::post_inc(&mut state.next_runnable_id);
238            if state.enable_runnable_backtraces {
239                state
240                    .runnable_backtraces
241                    .insert(id, backtrace::Backtrace::new_unresolved());
242            }
243        }
244
245        let unparker = self.parker.lock().unparker();
246        let (runnable, mut main_task) = unsafe {
247            async_task::spawn_unchecked(main_future, move |runnable| {
248                let state = &mut *state.lock();
249                state
250                    .scheduled_from_foreground
251                    .entry(cx_id)
252                    .or_default()
253                    .push(ForegroundRunnable {
254                        id: util::post_inc(&mut state.next_runnable_id),
255                        runnable,
256                        main: true,
257                    });
258                unparker.unpark();
259            })
260        };
261        runnable.schedule();
262
263        loop {
264            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
265                return result;
266            }
267
268            if !woken.load(SeqCst) {
269                self.state.lock().will_park();
270            }
271
272            woken.store(false, SeqCst);
273            self.parker.lock().park();
274        }
275    }
276
277    pub fn run_until_parked(&self) {
278        use std::sync::atomic::AtomicBool;
279        let woken = Arc::new(AtomicBool::new(false));
280        self.run_internal(woken, None);
281    }
282
283    fn run_internal(
284        &self,
285        woken: Arc<std::sync::atomic::AtomicBool>,
286        mut main_task: Option<&mut AnyLocalTask>,
287    ) -> Option<Box<dyn Any>> {
288        use rand::prelude::*;
289        use std::sync::atomic::Ordering::SeqCst;
290
291        let unparker = self.parker.lock().unparker();
292        let waker = waker_fn::waker_fn(move || {
293            woken.store(true, SeqCst);
294            unparker.unpark();
295        });
296
297        let mut cx = Context::from_waker(&waker);
298        loop {
299            let mut state = self.state.lock();
300
301            if state.scheduled_from_foreground.is_empty()
302                && state.scheduled_from_background.is_empty()
303            {
304                if let Some(main_task) = main_task {
305                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
306                        return Some(result);
307                    }
308                }
309
310                return None;
311            }
312
313            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
314                let background_len = state.scheduled_from_background.len();
315                let ix = state.rng.gen_range(0..background_len);
316                let background_runnable = state.scheduled_from_background.remove(ix);
317                state.poll_history.push(background_runnable.id);
318                drop(state);
319                background_runnable.runnable.run();
320            } else if !state.scheduled_from_foreground.is_empty() {
321                let available_cx_ids = state
322                    .scheduled_from_foreground
323                    .keys()
324                    .copied()
325                    .collect::<Vec<_>>();
326                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
327                let scheduled_from_cx = state
328                    .scheduled_from_foreground
329                    .get_mut(&cx_id_to_run)
330                    .unwrap();
331                let foreground_runnable = scheduled_from_cx.remove(0);
332                if scheduled_from_cx.is_empty() {
333                    state.scheduled_from_foreground.remove(&cx_id_to_run);
334                }
335                state.poll_history.push(foreground_runnable.id);
336
337                drop(state);
338
339                foreground_runnable.runnable.run();
340                if let Some(main_task) = main_task.as_mut() {
341                    if foreground_runnable.main {
342                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
343                            return Some(result);
344                        }
345                    }
346                }
347            }
348        }
349    }
350
351    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
352    where
353        F: Unpin + Future<Output = T>,
354    {
355        use rand::prelude::*;
356
357        let unparker = self.parker.lock().unparker();
358        let waker = waker_fn::waker_fn(move || {
359            unparker.unpark();
360        });
361
362        let mut cx = Context::from_waker(&waker);
363        for _ in 0..max_ticks {
364            let mut state = self.state.lock();
365            let runnable_count = state.scheduled_from_background.len();
366            let ix = state.rng.gen_range(0..=runnable_count);
367            if ix < state.scheduled_from_background.len() {
368                let background_runnable = state.scheduled_from_background.remove(ix);
369                state.poll_history.push(background_runnable.id);
370                drop(state);
371                background_runnable.runnable.run();
372            } else {
373                drop(state);
374                if let Poll::Ready(result) = future.poll(&mut cx) {
375                    return Some(result);
376                }
377                let mut state = self.state.lock();
378                if state.scheduled_from_background.is_empty() {
379                    state.will_park();
380                    drop(state);
381                    self.parker.lock().park();
382                }
383
384                continue;
385            }
386        }
387
388        None
389    }
390
391    pub fn timer(&self, duration: Duration) -> Timer {
392        let (tx, rx) = postage::barrier::channel();
393        let mut state = self.state.lock();
394        let wakeup_at = state.now + duration;
395        let id = util::post_inc(&mut state.next_timer_id);
396        match state
397            .pending_timers
398            .binary_search_by_key(&wakeup_at, |e| e.1)
399        {
400            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
401        }
402        let state = self.state.clone();
403        Timer::Deterministic(DeterministicTimer { rx, id, state })
404    }
405
406    pub fn now(&self) -> std::time::Instant {
407        let state = self.state.lock();
408        state.now
409    }
410
411    pub fn advance_clock(&self, duration: Duration) {
412        let new_now = self.state.lock().now + duration;
413        loop {
414            self.run_until_parked();
415            let mut state = self.state.lock();
416
417            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
418                let wakeup_time = *wakeup_time;
419                if wakeup_time <= new_now {
420                    let timer_count = state
421                        .pending_timers
422                        .iter()
423                        .take_while(|(_, t, _)| *t == wakeup_time)
424                        .count();
425                    state.now = wakeup_time;
426                    let timers_to_wake = state
427                        .pending_timers
428                        .drain(0..timer_count)
429                        .collect::<Vec<_>>();
430                    drop(state);
431                    drop(timers_to_wake);
432                    continue;
433                }
434            }
435
436            break;
437        }
438
439        self.state.lock().now = new_now;
440    }
441
442    pub fn start_waiting(&self) {
443        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
444    }
445
446    pub fn finish_waiting(&self) {
447        self.state.lock().waiting_backtrace.take();
448    }
449
450    pub fn forbid_parking(&self) {
451        use rand::prelude::*;
452
453        let mut state = self.state.lock();
454        state.forbid_parking = true;
455        state.rng = StdRng::seed_from_u64(state.seed);
456    }
457
458    pub async fn simulate_random_delay(&self) {
459        use rand::prelude::*;
460        use smol::future::yield_now;
461        if self.state.lock().rng.gen_bool(0.2) {
462            let yields = self.state.lock().rng.gen_range(1..=10);
463            for _ in 0..yields {
464                yield_now().await;
465            }
466        }
467    }
468}
469
470impl Drop for Timer {
471    fn drop(&mut self) {
472        #[cfg(any(test, feature = "test-support"))]
473        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
474            state
475                .lock()
476                .pending_timers
477                .retain(|(timer_id, _, _)| timer_id != id)
478        }
479    }
480}
481
482impl Future for Timer {
483    type Output = ();
484
485    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
486        match &mut *self {
487            #[cfg(any(test, feature = "test-support"))]
488            Self::Deterministic(DeterministicTimer { rx, .. }) => {
489                use postage::stream::{PollRecv, Stream as _};
490                smol::pin!(rx);
491                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
492                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
493                    PollRecv::Pending => Poll::Pending,
494                }
495            }
496            Self::Production(timer) => {
497                smol::pin!(timer);
498                match timer.poll(cx) {
499                    Poll::Ready(_) => Poll::Ready(()),
500                    Poll::Pending => Poll::Pending,
501                }
502            }
503        }
504    }
505}
506
507#[cfg(any(test, feature = "test-support"))]
508impl DeterministicState {
509    fn will_park(&mut self) {
510        if self.forbid_parking {
511            let mut backtrace_message = String::new();
512            #[cfg(any(test, feature = "test-support"))]
513            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
514                backtrace.resolve();
515                backtrace_message = format!(
516                    "\nbacktrace of waiting future:\n{:?}",
517                    util::CwdBacktrace(backtrace)
518                );
519            }
520
521            panic!(
522                "deterministic executor parked after a call to forbid_parking{}",
523                backtrace_message
524            );
525        }
526    }
527}
528
529impl Foreground {
530    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
531        if dispatcher.is_main_thread() {
532            Ok(Self::Platform {
533                dispatcher,
534                _not_send_or_sync: PhantomData,
535            })
536        } else {
537            Err(anyhow!("must be constructed on main thread"))
538        }
539    }
540
541    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
542        let future = any_local_future(future);
543        let any_task = match self {
544            #[cfg(any(test, feature = "test-support"))]
545            Self::Deterministic { cx_id, executor } => {
546                executor.spawn_from_foreground(*cx_id, future, false)
547            }
548            Self::Platform { dispatcher, .. } => {
549                fn spawn_inner(
550                    future: AnyLocalFuture,
551                    dispatcher: &Arc<dyn Dispatcher>,
552                ) -> AnyLocalTask {
553                    let dispatcher = dispatcher.clone();
554                    let schedule =
555                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
556                    let (runnable, task) = async_task::spawn_local(future, schedule);
557                    runnable.schedule();
558                    task
559                }
560                spawn_inner(future, dispatcher)
561            }
562        };
563        Task::local(any_task)
564    }
565
566    #[cfg(any(test, feature = "test-support"))]
567    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
568        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
569        let result = match self {
570            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
571            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
572        };
573        *result.downcast().unwrap()
574    }
575
576    #[cfg(any(test, feature = "test-support"))]
577    pub fn run_until_parked(&self) {
578        match self {
579            Self::Deterministic { executor, .. } => executor.run_until_parked(),
580            _ => panic!("this method can only be called on a deterministic executor"),
581        }
582    }
583
584    #[cfg(any(test, feature = "test-support"))]
585    pub fn parking_forbidden(&self) -> bool {
586        match self {
587            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
588            _ => panic!("this method can only be called on a deterministic executor"),
589        }
590    }
591
592    #[cfg(any(test, feature = "test-support"))]
593    pub fn start_waiting(&self) {
594        match self {
595            Self::Deterministic { executor, .. } => executor.start_waiting(),
596            _ => panic!("this method can only be called on a deterministic executor"),
597        }
598    }
599
600    #[cfg(any(test, feature = "test-support"))]
601    pub fn finish_waiting(&self) {
602        match self {
603            Self::Deterministic { executor, .. } => executor.finish_waiting(),
604            _ => panic!("this method can only be called on a deterministic executor"),
605        }
606    }
607
608    #[cfg(any(test, feature = "test-support"))]
609    pub fn forbid_parking(&self) {
610        match self {
611            Self::Deterministic { executor, .. } => executor.forbid_parking(),
612            _ => panic!("this method can only be called on a deterministic executor"),
613        }
614    }
615
616    #[cfg(any(test, feature = "test-support"))]
617    pub fn advance_clock(&self, duration: Duration) {
618        match self {
619            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
620            _ => panic!("this method can only be called on a deterministic executor"),
621        }
622    }
623
624    #[cfg(any(test, feature = "test-support"))]
625    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
626        match self {
627            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
628            _ => panic!("this method can only be called on a deterministic executor"),
629        }
630    }
631}
632
633impl Background {
634    pub fn new() -> Self {
635        let executor = Arc::new(Executor::new());
636        let stop = channel::unbounded::<()>();
637
638        for i in 0..2 * num_cpus::get() {
639            let executor = executor.clone();
640            let stop = stop.1.clone();
641            thread::Builder::new()
642                .name(format!("background-executor-{}", i))
643                .spawn(move || smol::block_on(executor.run(stop.recv())))
644                .unwrap();
645        }
646
647        Self::Production {
648            executor,
649            _stop: stop.0,
650        }
651    }
652
653    pub fn num_cpus(&self) -> usize {
654        num_cpus::get()
655    }
656
657    pub fn spawn<T, F>(&self, future: F) -> Task<T>
658    where
659        T: 'static + Send,
660        F: Send + Future<Output = T> + 'static,
661    {
662        let future = any_future(future);
663        let any_task = match self {
664            Self::Production { executor, .. } => executor.spawn(future),
665            #[cfg(any(test, feature = "test-support"))]
666            Self::Deterministic { executor } => executor.spawn(future),
667        };
668        Task::send(any_task)
669    }
670
671    pub fn block<F, T>(&self, future: F) -> T
672    where
673        F: Future<Output = T>,
674    {
675        smol::pin!(future);
676        match self {
677            Self::Production { .. } => smol::block_on(&mut future),
678            #[cfg(any(test, feature = "test-support"))]
679            Self::Deterministic { executor, .. } => {
680                executor.block(&mut future, usize::MAX).unwrap()
681            }
682        }
683    }
684
685    pub fn block_with_timeout<F, T>(
686        &self,
687        timeout: Duration,
688        future: F,
689    ) -> Result<T, impl Future<Output = T>>
690    where
691        T: 'static,
692        F: 'static + Unpin + Future<Output = T>,
693    {
694        let mut future = any_local_future(future);
695        if !timeout.is_zero() {
696            let output = match self {
697                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
698                #[cfg(any(test, feature = "test-support"))]
699                Self::Deterministic { executor, .. } => {
700                    use rand::prelude::*;
701                    let max_ticks = {
702                        let mut state = executor.state.lock();
703                        let range = state.block_on_ticks.clone();
704                        state.rng.gen_range(range)
705                    };
706                    executor.block(&mut future, max_ticks)
707                }
708            };
709            if let Some(output) = output {
710                return Ok(*output.downcast().unwrap());
711            }
712        }
713        Err(async { *future.await.downcast().unwrap() })
714    }
715
716    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
717    where
718        F: FnOnce(&mut Scope<'scope>),
719    {
720        let mut scope = Scope::new(self.clone());
721        (scheduler)(&mut scope);
722        let spawned = mem::take(&mut scope.futures)
723            .into_iter()
724            .map(|f| self.spawn(f))
725            .collect::<Vec<_>>();
726        for task in spawned {
727            task.await;
728        }
729    }
730
731    pub fn timer(&self, duration: Duration) -> Timer {
732        match self {
733            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
734            #[cfg(any(test, feature = "test-support"))]
735            Background::Deterministic { executor } => executor.timer(duration),
736        }
737    }
738
739    pub fn now(&self) -> std::time::Instant {
740        match self {
741            Background::Production { .. } => std::time::Instant::now(),
742            #[cfg(any(test, feature = "test-support"))]
743            Background::Deterministic { executor } => executor.now(),
744        }
745    }
746
747    #[cfg(any(test, feature = "test-support"))]
748    pub async fn simulate_random_delay(&self) {
749        match self {
750            Self::Deterministic { executor, .. } => {
751                executor.simulate_random_delay().await;
752            }
753            _ => {
754                panic!("this method can only be called on a deterministic executor")
755            }
756        }
757    }
758}
759
760impl Default for Background {
761    fn default() -> Self {
762        Self::new()
763    }
764}
765
766pub struct Scope<'a> {
767    executor: Arc<Background>,
768    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
769    tx: Option<mpsc::Sender<()>>,
770    rx: mpsc::Receiver<()>,
771    _phantom: PhantomData<&'a ()>,
772}
773
774impl<'a> Scope<'a> {
775    fn new(executor: Arc<Background>) -> Self {
776        let (tx, rx) = mpsc::channel(1);
777        Self {
778            executor,
779            tx: Some(tx),
780            rx,
781            futures: Default::default(),
782            _phantom: PhantomData,
783        }
784    }
785
786    pub fn spawn<F>(&mut self, f: F)
787    where
788        F: Future<Output = ()> + Send + 'a,
789    {
790        let tx = self.tx.clone().unwrap();
791
792        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
793        // dropping this `Scope` blocks until all of the futures have resolved.
794        let f = unsafe {
795            mem::transmute::<
796                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
797                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
798            >(Box::pin(async move {
799                f.await;
800                drop(tx);
801            }))
802        };
803        self.futures.push(f);
804    }
805}
806
807impl<'a> Drop for Scope<'a> {
808    fn drop(&mut self) {
809        self.tx.take().unwrap();
810
811        // Wait until the channel is closed, which means that all of the spawned
812        // futures have resolved.
813        self.executor.block(self.rx.next());
814    }
815}
816
817impl<T> Task<T> {
818    pub fn ready(value: T) -> Self {
819        Self::Ready(Some(value))
820    }
821
822    fn local(any_task: AnyLocalTask) -> Self {
823        Self::Local {
824            any_task,
825            result_type: PhantomData,
826        }
827    }
828
829    pub fn detach(self) {
830        match self {
831            Task::Ready(_) => {}
832            Task::Local { any_task, .. } => any_task.detach(),
833            Task::Send { any_task, .. } => any_task.detach(),
834        }
835    }
836}
837
838impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
839    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
840        cx.spawn(|_| async move {
841            if let Err(err) = self.await {
842                log::error!("{}", err);
843            }
844        })
845        .detach();
846    }
847}
848
849impl<T: Send> Task<T> {
850    fn send(any_task: AnyTask) -> Self {
851        Self::Send {
852            any_task,
853            result_type: PhantomData,
854        }
855    }
856}
857
858impl<T: fmt::Debug> fmt::Debug for Task<T> {
859    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
860        match self {
861            Task::Ready(value) => value.fmt(f),
862            Task::Local { any_task, .. } => any_task.fmt(f),
863            Task::Send { any_task, .. } => any_task.fmt(f),
864        }
865    }
866}
867
868impl<T: 'static> Future for Task<T> {
869    type Output = T;
870
871    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872        match unsafe { self.get_unchecked_mut() } {
873            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
874            Task::Local { any_task, .. } => {
875                any_task.poll(cx).map(|value| *value.downcast().unwrap())
876            }
877            Task::Send { any_task, .. } => {
878                any_task.poll(cx).map(|value| *value.downcast().unwrap())
879            }
880        }
881    }
882}
883
884fn any_future<T, F>(future: F) -> AnyFuture
885where
886    T: 'static + Send,
887    F: Future<Output = T> + Send + 'static,
888{
889    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
890}
891
892fn any_local_future<T, F>(future: F) -> AnyLocalFuture
893where
894    T: 'static,
895    F: Future<Output = T> + 'static,
896{
897    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
898}