executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util, AppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<BackgroundRunnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76    next_runnable_id: usize,
 77    poll_history: Vec<ExecutorEvent>,
 78    previous_poll_history: Option<Vec<ExecutorEvent>>,
 79    enable_runnable_backtraces: bool,
 80    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 81}
 82
 83#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 84pub enum ExecutorEvent {
 85    PollRunnable { id: usize },
 86    EnqueuRunnable { id: usize },
 87}
 88
 89#[cfg(any(test, feature = "test-support"))]
 90struct ForegroundRunnable {
 91    id: usize,
 92    runnable: Runnable,
 93    main: bool,
 94}
 95
 96#[cfg(any(test, feature = "test-support"))]
 97struct BackgroundRunnable {
 98    id: usize,
 99    runnable: Runnable,
100}
101
102#[cfg(any(test, feature = "test-support"))]
103pub struct Deterministic {
104    state: Arc<parking_lot::Mutex<DeterministicState>>,
105    parker: parking_lot::Mutex<parking::Parker>,
106}
107
108pub enum Timer {
109    Production(smol::Timer),
110    #[cfg(any(test, feature = "test-support"))]
111    Deterministic(DeterministicTimer),
112}
113
114#[cfg(any(test, feature = "test-support"))]
115pub struct DeterministicTimer {
116    rx: postage::barrier::Receiver,
117    id: usize,
118    state: Arc<parking_lot::Mutex<DeterministicState>>,
119}
120
121#[cfg(any(test, feature = "test-support"))]
122impl Deterministic {
123    pub fn new(seed: u64) -> Arc<Self> {
124        use rand::prelude::*;
125
126        Arc::new(Self {
127            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
128                rng: StdRng::seed_from_u64(seed),
129                seed,
130                scheduled_from_foreground: Default::default(),
131                scheduled_from_background: Default::default(),
132                forbid_parking: false,
133                block_on_ticks: 0..=1000,
134                now: std::time::Instant::now(),
135                next_timer_id: Default::default(),
136                pending_timers: Default::default(),
137                waiting_backtrace: None,
138                next_runnable_id: 0,
139                poll_history: Default::default(),
140                previous_poll_history: Default::default(),
141                enable_runnable_backtraces: false,
142                runnable_backtraces: Default::default(),
143            })),
144            parker: Default::default(),
145        })
146    }
147
148    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
149        self.state.lock().poll_history.clone()
150    }
151
152    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
153        self.state.lock().previous_poll_history = history;
154    }
155
156    pub fn enable_runnable_backtrace(&self) {
157        self.state.lock().enable_runnable_backtraces = true;
158    }
159
160    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
161        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
162        backtrace.resolve();
163        backtrace
164    }
165
166    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
167        Arc::new(Background::Deterministic {
168            executor: self.clone(),
169        })
170    }
171
172    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
173        Rc::new(Foreground::Deterministic {
174            cx_id: id,
175            executor: self.clone(),
176        })
177    }
178
179    fn spawn_from_foreground(
180        &self,
181        cx_id: usize,
182        future: AnyLocalFuture,
183        main: bool,
184    ) -> AnyLocalTask {
185        let state = self.state.clone();
186        let id;
187        {
188            let mut state = state.lock();
189            id = util::post_inc(&mut state.next_runnable_id);
190            if state.enable_runnable_backtraces {
191                state
192                    .runnable_backtraces
193                    .insert(id, backtrace::Backtrace::new_unresolved());
194            }
195        }
196
197        let unparker = self.parker.lock().unparker();
198        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
199            let mut state = state.lock();
200            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
201            state
202                .scheduled_from_foreground
203                .entry(cx_id)
204                .or_default()
205                .push(ForegroundRunnable { id, runnable, main });
206            unparker.unpark();
207        });
208        runnable.schedule();
209        task
210    }
211
212    fn spawn(&self, future: AnyFuture) -> AnyTask {
213        let state = self.state.clone();
214        let id;
215        {
216            let mut state = state.lock();
217            id = util::post_inc(&mut state.next_runnable_id);
218            if state.enable_runnable_backtraces {
219                state
220                    .runnable_backtraces
221                    .insert(id, backtrace::Backtrace::new_unresolved());
222            }
223        }
224
225        let unparker = self.parker.lock().unparker();
226        let (runnable, task) = async_task::spawn(future, move |runnable| {
227            let mut state = state.lock();
228            state
229                .poll_history
230                .push(ExecutorEvent::EnqueuRunnable { id });
231            state
232                .scheduled_from_background
233                .push(BackgroundRunnable { id, runnable });
234            unparker.unpark();
235        });
236        runnable.schedule();
237        task
238    }
239
240    fn run<'a>(
241        &self,
242        cx_id: usize,
243        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
244    ) -> Box<dyn Any> {
245        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
246
247        let woken = Arc::new(AtomicBool::new(false));
248
249        let state = self.state.clone();
250        let id;
251        {
252            let mut state = state.lock();
253            id = util::post_inc(&mut state.next_runnable_id);
254            if state.enable_runnable_backtraces {
255                state
256                    .runnable_backtraces
257                    .insert(id, backtrace::Backtrace::new_unresolved());
258            }
259        }
260
261        let unparker = self.parker.lock().unparker();
262        let (runnable, mut main_task) = unsafe {
263            async_task::spawn_unchecked(main_future, move |runnable| {
264                let state = &mut *state.lock();
265                state
266                    .scheduled_from_foreground
267                    .entry(cx_id)
268                    .or_default()
269                    .push(ForegroundRunnable {
270                        id: util::post_inc(&mut state.next_runnable_id),
271                        runnable,
272                        main: true,
273                    });
274                unparker.unpark();
275            })
276        };
277        runnable.schedule();
278
279        loop {
280            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
281                return result;
282            }
283
284            if !woken.load(SeqCst) {
285                self.state.lock().will_park();
286            }
287
288            woken.store(false, SeqCst);
289            self.parker.lock().park();
290        }
291    }
292
293    pub fn run_until_parked(&self) {
294        use std::sync::atomic::AtomicBool;
295        let woken = Arc::new(AtomicBool::new(false));
296        self.run_internal(woken, None);
297    }
298
299    fn run_internal(
300        &self,
301        woken: Arc<std::sync::atomic::AtomicBool>,
302        mut main_task: Option<&mut AnyLocalTask>,
303    ) -> Option<Box<dyn Any>> {
304        use rand::prelude::*;
305        use std::sync::atomic::Ordering::SeqCst;
306
307        let unparker = self.parker.lock().unparker();
308        let waker = waker_fn::waker_fn(move || {
309            woken.store(true, SeqCst);
310            unparker.unpark();
311        });
312
313        let mut cx = Context::from_waker(&waker);
314        loop {
315            let mut state = self.state.lock();
316
317            if state.scheduled_from_foreground.is_empty()
318                && state.scheduled_from_background.is_empty()
319            {
320                if let Some(main_task) = main_task {
321                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
322                        return Some(result);
323                    }
324                }
325
326                return None;
327            }
328
329            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
330                let background_len = state.scheduled_from_background.len();
331                let ix = state.rng.gen_range(0..background_len);
332                let background_runnable = state.scheduled_from_background.remove(ix);
333                state.push_to_history(ExecutorEvent::PollRunnable {
334                    id: background_runnable.id,
335                });
336                drop(state);
337                background_runnable.runnable.run();
338            } else if !state.scheduled_from_foreground.is_empty() {
339                let available_cx_ids = state
340                    .scheduled_from_foreground
341                    .keys()
342                    .copied()
343                    .collect::<Vec<_>>();
344                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
345                let scheduled_from_cx = state
346                    .scheduled_from_foreground
347                    .get_mut(&cx_id_to_run)
348                    .unwrap();
349                let foreground_runnable = scheduled_from_cx.remove(0);
350                if scheduled_from_cx.is_empty() {
351                    state.scheduled_from_foreground.remove(&cx_id_to_run);
352                }
353                state.push_to_history(ExecutorEvent::PollRunnable {
354                    id: foreground_runnable.id,
355                });
356
357                drop(state);
358
359                foreground_runnable.runnable.run();
360                if let Some(main_task) = main_task.as_mut() {
361                    if foreground_runnable.main {
362                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
363                            return Some(result);
364                        }
365                    }
366                }
367            }
368        }
369    }
370
371    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
372    where
373        F: Unpin + Future<Output = T>,
374    {
375        use rand::prelude::*;
376
377        let unparker = self.parker.lock().unparker();
378        let waker = waker_fn::waker_fn(move || {
379            unparker.unpark();
380        });
381
382        let mut cx = Context::from_waker(&waker);
383        for _ in 0..max_ticks {
384            let mut state = self.state.lock();
385            let runnable_count = state.scheduled_from_background.len();
386            let ix = state.rng.gen_range(0..=runnable_count);
387            if ix < state.scheduled_from_background.len() {
388                let background_runnable = state.scheduled_from_background.remove(ix);
389                state.push_to_history(ExecutorEvent::PollRunnable {
390                    id: background_runnable.id,
391                });
392                drop(state);
393                background_runnable.runnable.run();
394            } else {
395                drop(state);
396                if let Poll::Ready(result) = future.poll(&mut cx) {
397                    return Some(result);
398                }
399                let mut state = self.state.lock();
400                if state.scheduled_from_background.is_empty() {
401                    state.will_park();
402                    drop(state);
403                    self.parker.lock().park();
404                }
405
406                continue;
407            }
408        }
409
410        None
411    }
412
413    pub fn timer(&self, duration: Duration) -> Timer {
414        let (tx, rx) = postage::barrier::channel();
415        let mut state = self.state.lock();
416        let wakeup_at = state.now + duration;
417        let id = util::post_inc(&mut state.next_timer_id);
418        match state
419            .pending_timers
420            .binary_search_by_key(&wakeup_at, |e| e.1)
421        {
422            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
423        }
424        let state = self.state.clone();
425        Timer::Deterministic(DeterministicTimer { rx, id, state })
426    }
427
428    pub fn now(&self) -> std::time::Instant {
429        let state = self.state.lock();
430        state.now
431    }
432
433    pub fn advance_clock(&self, duration: Duration) {
434        let new_now = self.state.lock().now + duration;
435        loop {
436            self.run_until_parked();
437            let mut state = self.state.lock();
438
439            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
440                let wakeup_time = *wakeup_time;
441                if wakeup_time <= new_now {
442                    let timer_count = state
443                        .pending_timers
444                        .iter()
445                        .take_while(|(_, t, _)| *t == wakeup_time)
446                        .count();
447                    state.now = wakeup_time;
448                    let timers_to_wake = state
449                        .pending_timers
450                        .drain(0..timer_count)
451                        .collect::<Vec<_>>();
452                    drop(state);
453                    drop(timers_to_wake);
454                    continue;
455                }
456            }
457
458            break;
459        }
460
461        self.state.lock().now = new_now;
462    }
463
464    pub fn start_waiting(&self) {
465        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
466    }
467
468    pub fn finish_waiting(&self) {
469        self.state.lock().waiting_backtrace.take();
470    }
471
472    pub fn forbid_parking(&self) {
473        use rand::prelude::*;
474
475        let mut state = self.state.lock();
476        state.forbid_parking = true;
477        state.rng = StdRng::seed_from_u64(state.seed);
478    }
479
480    pub async fn simulate_random_delay(&self) {
481        use rand::prelude::*;
482        use smol::future::yield_now;
483        if self.state.lock().rng.gen_bool(0.2) {
484            let yields = self.state.lock().rng.gen_range(1..=10);
485            for _ in 0..yields {
486                yield_now().await;
487            }
488        }
489    }
490
491    pub fn record_backtrace(&self) {
492        let mut state = self.state.lock();
493        if state.enable_runnable_backtraces {
494            let current_id = state
495                .poll_history
496                .iter()
497                .rev()
498                .find_map(|event| match event {
499                    ExecutorEvent::PollRunnable { id } => Some(*id),
500                    _ => None,
501                });
502            if let Some(id) = current_id {
503                state
504                    .runnable_backtraces
505                    .insert(id, backtrace::Backtrace::new_unresolved());
506            }
507        }
508    }
509}
510
511impl Drop for Timer {
512    fn drop(&mut self) {
513        #[cfg(any(test, feature = "test-support"))]
514        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
515            state
516                .lock()
517                .pending_timers
518                .retain(|(timer_id, _, _)| timer_id != id)
519        }
520    }
521}
522
523impl Future for Timer {
524    type Output = ();
525
526    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
527        match &mut *self {
528            #[cfg(any(test, feature = "test-support"))]
529            Self::Deterministic(DeterministicTimer { rx, .. }) => {
530                use postage::stream::{PollRecv, Stream as _};
531                smol::pin!(rx);
532                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
533                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
534                    PollRecv::Pending => Poll::Pending,
535                }
536            }
537            Self::Production(timer) => {
538                smol::pin!(timer);
539                match timer.poll(cx) {
540                    Poll::Ready(_) => Poll::Ready(()),
541                    Poll::Pending => Poll::Pending,
542                }
543            }
544        }
545    }
546}
547
548#[cfg(any(test, feature = "test-support"))]
549impl DeterministicState {
550    fn push_to_history(&mut self, event: ExecutorEvent) {
551        use std::fmt::Write as _;
552
553        self.poll_history.push(event);
554        if let Some(prev_history) = &self.previous_poll_history {
555            let ix = self.poll_history.len() - 1;
556            let prev_event = prev_history[ix];
557            if event != prev_event {
558                let mut message = String::new();
559                writeln!(
560                    &mut message,
561                    "current runnable backtrace:\n{:?}",
562                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
563                        trace.resolve();
564                        util::CwdBacktrace(trace)
565                    })
566                )
567                .unwrap();
568                writeln!(
569                    &mut message,
570                    "previous runnable backtrace:\n{:?}",
571                    self.runnable_backtraces
572                        .get_mut(&prev_event.id())
573                        .map(|trace| {
574                            trace.resolve();
575                            util::CwdBacktrace(trace)
576                        })
577                )
578                .unwrap();
579                panic!("detected non-determinism after {ix}. {message}");
580            }
581        }
582    }
583
584    fn will_park(&mut self) {
585        if self.forbid_parking {
586            let mut backtrace_message = String::new();
587            #[cfg(any(test, feature = "test-support"))]
588            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
589                backtrace.resolve();
590                backtrace_message = format!(
591                    "\nbacktrace of waiting future:\n{:?}",
592                    util::CwdBacktrace(backtrace)
593                );
594            }
595
596            panic!(
597                "deterministic executor parked after a call to forbid_parking{}",
598                backtrace_message
599            );
600        }
601    }
602}
603
604#[cfg(any(test, feature = "test-support"))]
605impl ExecutorEvent {
606    pub fn id(&self) -> usize {
607        match self {
608            ExecutorEvent::PollRunnable { id } => *id,
609            ExecutorEvent::EnqueuRunnable { id } => *id,
610        }
611    }
612}
613
614impl Foreground {
615    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
616        if dispatcher.is_main_thread() {
617            Ok(Self::Platform {
618                dispatcher,
619                _not_send_or_sync: PhantomData,
620            })
621        } else {
622            Err(anyhow!("must be constructed on main thread"))
623        }
624    }
625
626    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
627        let future = any_local_future(future);
628        let any_task = match self {
629            #[cfg(any(test, feature = "test-support"))]
630            Self::Deterministic { cx_id, executor } => {
631                executor.spawn_from_foreground(*cx_id, future, false)
632            }
633            Self::Platform { dispatcher, .. } => {
634                fn spawn_inner(
635                    future: AnyLocalFuture,
636                    dispatcher: &Arc<dyn Dispatcher>,
637                ) -> AnyLocalTask {
638                    let dispatcher = dispatcher.clone();
639                    let schedule =
640                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
641                    let (runnable, task) = async_task::spawn_local(future, schedule);
642                    runnable.schedule();
643                    task
644                }
645                spawn_inner(future, dispatcher)
646            }
647        };
648        Task::local(any_task)
649    }
650
651    #[cfg(any(test, feature = "test-support"))]
652    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
653        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
654        let result = match self {
655            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
656            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
657        };
658        *result.downcast().unwrap()
659    }
660
661    #[cfg(any(test, feature = "test-support"))]
662    pub fn run_until_parked(&self) {
663        match self {
664            Self::Deterministic { executor, .. } => executor.run_until_parked(),
665            _ => panic!("this method can only be called on a deterministic executor"),
666        }
667    }
668
669    #[cfg(any(test, feature = "test-support"))]
670    pub fn parking_forbidden(&self) -> bool {
671        match self {
672            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
673            _ => panic!("this method can only be called on a deterministic executor"),
674        }
675    }
676
677    #[cfg(any(test, feature = "test-support"))]
678    pub fn start_waiting(&self) {
679        match self {
680            Self::Deterministic { executor, .. } => executor.start_waiting(),
681            _ => panic!("this method can only be called on a deterministic executor"),
682        }
683    }
684
685    #[cfg(any(test, feature = "test-support"))]
686    pub fn finish_waiting(&self) {
687        match self {
688            Self::Deterministic { executor, .. } => executor.finish_waiting(),
689            _ => panic!("this method can only be called on a deterministic executor"),
690        }
691    }
692
693    #[cfg(any(test, feature = "test-support"))]
694    pub fn forbid_parking(&self) {
695        match self {
696            Self::Deterministic { executor, .. } => executor.forbid_parking(),
697            _ => panic!("this method can only be called on a deterministic executor"),
698        }
699    }
700
701    #[cfg(any(test, feature = "test-support"))]
702    pub fn advance_clock(&self, duration: Duration) {
703        match self {
704            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
705            _ => panic!("this method can only be called on a deterministic executor"),
706        }
707    }
708
709    #[cfg(any(test, feature = "test-support"))]
710    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
711        match self {
712            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
713            _ => panic!("this method can only be called on a deterministic executor"),
714        }
715    }
716}
717
718impl Background {
719    pub fn new() -> Self {
720        let executor = Arc::new(Executor::new());
721        let stop = channel::unbounded::<()>();
722
723        for i in 0..2 * num_cpus::get() {
724            let executor = executor.clone();
725            let stop = stop.1.clone();
726            thread::Builder::new()
727                .name(format!("background-executor-{}", i))
728                .spawn(move || smol::block_on(executor.run(stop.recv())))
729                .unwrap();
730        }
731
732        Self::Production {
733            executor,
734            _stop: stop.0,
735        }
736    }
737
738    pub fn num_cpus(&self) -> usize {
739        num_cpus::get()
740    }
741
742    pub fn spawn<T, F>(&self, future: F) -> Task<T>
743    where
744        T: 'static + Send,
745        F: Send + Future<Output = T> + 'static,
746    {
747        let future = any_future(future);
748        let any_task = match self {
749            Self::Production { executor, .. } => executor.spawn(future),
750            #[cfg(any(test, feature = "test-support"))]
751            Self::Deterministic { executor } => executor.spawn(future),
752        };
753        Task::send(any_task)
754    }
755
756    pub fn block<F, T>(&self, future: F) -> T
757    where
758        F: Future<Output = T>,
759    {
760        smol::pin!(future);
761        match self {
762            Self::Production { .. } => smol::block_on(&mut future),
763            #[cfg(any(test, feature = "test-support"))]
764            Self::Deterministic { executor, .. } => {
765                executor.block(&mut future, usize::MAX).unwrap()
766            }
767        }
768    }
769
770    pub fn block_with_timeout<F, T>(
771        &self,
772        timeout: Duration,
773        future: F,
774    ) -> Result<T, impl Future<Output = T>>
775    where
776        T: 'static,
777        F: 'static + Unpin + Future<Output = T>,
778    {
779        let mut future = any_local_future(future);
780        if !timeout.is_zero() {
781            let output = match self {
782                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
783                #[cfg(any(test, feature = "test-support"))]
784                Self::Deterministic { executor, .. } => {
785                    use rand::prelude::*;
786                    let max_ticks = {
787                        let mut state = executor.state.lock();
788                        let range = state.block_on_ticks.clone();
789                        state.rng.gen_range(range)
790                    };
791                    executor.block(&mut future, max_ticks)
792                }
793            };
794            if let Some(output) = output {
795                return Ok(*output.downcast().unwrap());
796            }
797        }
798        Err(async { *future.await.downcast().unwrap() })
799    }
800
801    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
802    where
803        F: FnOnce(&mut Scope<'scope>),
804    {
805        let mut scope = Scope::new(self.clone());
806        (scheduler)(&mut scope);
807        let spawned = mem::take(&mut scope.futures)
808            .into_iter()
809            .map(|f| self.spawn(f))
810            .collect::<Vec<_>>();
811        for task in spawned {
812            task.await;
813        }
814    }
815
816    pub fn timer(&self, duration: Duration) -> Timer {
817        match self {
818            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
819            #[cfg(any(test, feature = "test-support"))]
820            Background::Deterministic { executor } => executor.timer(duration),
821        }
822    }
823
824    pub fn now(&self) -> std::time::Instant {
825        match self {
826            Background::Production { .. } => std::time::Instant::now(),
827            #[cfg(any(test, feature = "test-support"))]
828            Background::Deterministic { executor } => executor.now(),
829        }
830    }
831
832    #[cfg(any(test, feature = "test-support"))]
833    pub async fn simulate_random_delay(&self) {
834        match self {
835            Self::Deterministic { executor, .. } => {
836                executor.simulate_random_delay().await;
837            }
838            _ => {
839                panic!("this method can only be called on a deterministic executor")
840            }
841        }
842    }
843
844    #[cfg(any(test, feature = "test-support"))]
845    pub fn record_backtrace(&self) {
846        match self {
847            Self::Deterministic { executor, .. } => executor.record_backtrace(),
848            _ => {
849                panic!("this method can only be called on a deterministic executor")
850            }
851        }
852    }
853}
854
855impl Default for Background {
856    fn default() -> Self {
857        Self::new()
858    }
859}
860
861pub struct Scope<'a> {
862    executor: Arc<Background>,
863    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
864    tx: Option<mpsc::Sender<()>>,
865    rx: mpsc::Receiver<()>,
866    _phantom: PhantomData<&'a ()>,
867}
868
869impl<'a> Scope<'a> {
870    fn new(executor: Arc<Background>) -> Self {
871        let (tx, rx) = mpsc::channel(1);
872        Self {
873            executor,
874            tx: Some(tx),
875            rx,
876            futures: Default::default(),
877            _phantom: PhantomData,
878        }
879    }
880
881    pub fn spawn<F>(&mut self, f: F)
882    where
883        F: Future<Output = ()> + Send + 'a,
884    {
885        let tx = self.tx.clone().unwrap();
886
887        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
888        // dropping this `Scope` blocks until all of the futures have resolved.
889        let f = unsafe {
890            mem::transmute::<
891                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
892                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
893            >(Box::pin(async move {
894                f.await;
895                drop(tx);
896            }))
897        };
898        self.futures.push(f);
899    }
900}
901
902impl<'a> Drop for Scope<'a> {
903    fn drop(&mut self) {
904        self.tx.take().unwrap();
905
906        // Wait until the channel is closed, which means that all of the spawned
907        // futures have resolved.
908        self.executor.block(self.rx.next());
909    }
910}
911
912impl<T> Task<T> {
913    pub fn ready(value: T) -> Self {
914        Self::Ready(Some(value))
915    }
916
917    fn local(any_task: AnyLocalTask) -> Self {
918        Self::Local {
919            any_task,
920            result_type: PhantomData,
921        }
922    }
923
924    pub fn detach(self) {
925        match self {
926            Task::Ready(_) => {}
927            Task::Local { any_task, .. } => any_task.detach(),
928            Task::Send { any_task, .. } => any_task.detach(),
929        }
930    }
931}
932
933impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
934    pub fn detach_and_log_err(self, cx: &mut AppContext) {
935        cx.spawn(|_| async move {
936            if let Err(err) = self.await {
937                log::error!("{}", err);
938            }
939        })
940        .detach();
941    }
942}
943
944impl<T: Send> Task<T> {
945    fn send(any_task: AnyTask) -> Self {
946        Self::Send {
947            any_task,
948            result_type: PhantomData,
949        }
950    }
951}
952
953impl<T: fmt::Debug> fmt::Debug for Task<T> {
954    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
955        match self {
956            Task::Ready(value) => value.fmt(f),
957            Task::Local { any_task, .. } => any_task.fmt(f),
958            Task::Send { any_task, .. } => any_task.fmt(f),
959        }
960    }
961}
962
963impl<T: 'static> Future for Task<T> {
964    type Output = T;
965
966    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
967        match unsafe { self.get_unchecked_mut() } {
968            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
969            Task::Local { any_task, .. } => {
970                any_task.poll(cx).map(|value| *value.downcast().unwrap())
971            }
972            Task::Send { any_task, .. } => {
973                any_task.poll(cx).map(|value| *value.downcast().unwrap())
974            }
975        }
976    }
977}
978
979fn any_future<T, F>(future: F) -> AnyFuture
980where
981    T: 'static + Send,
982    F: Future<Output = T> + Send + 'static,
983{
984    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
985}
986
987fn any_local_future<T, F>(future: F) -> AnyLocalFuture
988where
989    T: 'static,
990    F: Future<Output = T> + 'static,
991{
992    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
993}