executor2.rs

  1use crate::App;
  2use futures::channel::mpsc;
  3use scheduler::Timer;
  4use smol::prelude::*;
  5use std::sync::Arc;
  6use std::{
  7    fmt::Debug,
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    task::{Context, Poll},
 12    time::{Duration, Instant},
 13};
 14use util::TryFutureExt;
 15
 16pub use scheduler::{Scheduler, Yield};
 17
 18#[cfg(any(test, feature = "test-support"))]
 19use rand::rngs::StdRng;
 20
 21/// A pointer to the executor that is currently running,
 22/// for spawning background tasks.
 23#[derive(Clone)]
 24pub struct BackgroundExecutor(scheduler::BackgroundExecutor);
 25
 26/// A pointer to the executor that is currently running,
 27/// for spawning tasks on the main thread.
 28///
 29/// This is intentionally `!Send` via the `not_send` marker field. This is because
 30/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
 31/// only polled from the same thread it was spawned from. These checks would fail when spawning
 32/// foreground tasks from from background threads.
 33#[derive(Clone)]
 34pub struct ForegroundExecutor(scheduler::ForegroundExecutor);
 35
 36/// Task is a primitive that allows work to happen in the background.
 37///
 38/// It implements [`Future`] so you can `.await` on it.
 39///
 40/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 41/// the task to continue running, but with no way to return a value.
 42#[must_use]
 43#[derive(Debug)]
 44pub struct Task<T>(scheduler::Task<T>);
 45
 46impl<T> Task<T> {
 47    /// Creates a new task that will resolve with the value
 48    pub fn ready(val: T) -> Self {
 49        Task(scheduler::Task::ready(val))
 50    }
 51
 52    /// Detaching a task runs it to completion in the background
 53    pub fn detach(self) {
 54        self.0.detach()
 55    }
 56}
 57
 58impl<E, T> Task<Result<T, E>>
 59where
 60    T: 'static,
 61    E: 'static + Debug,
 62{
 63    /// Run the task to completion in the background and log any
 64    /// errors that occur.
 65    #[track_caller]
 66    pub fn detach_and_log_err(self, cx: &App) {
 67        let location = core::panic::Location::caller();
 68        cx.foreground_executor()
 69            .spawn(self.log_tracked_err(*location))
 70            .detach();
 71    }
 72}
 73
 74impl<T> Future for Task<T> {
 75    type Output = T;
 76
 77    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 78        unsafe { self.map_unchecked_mut(|t| &mut t.0) }.poll(cx)
 79    }
 80}
 81
 82type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 83
 84type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 85
 86/// BackgroundExecutor lets you run things on background threads.
 87/// In production this is a thread pool with no ordering guarantees.
 88/// In tests this is simulated by running tasks one by one in a deterministic
 89/// (but arbitrary) order controlled by the `SEED` environment variable.
 90impl BackgroundExecutor {
 91    /// Create a new BackgroundExecutor
 92    pub fn new(executor: scheduler::BackgroundExecutor) -> Self {
 93        Self(executor)
 94    }
 95
 96    /// Enqueues the given future to be run to completion on a background thread.
 97    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 98    where
 99        R: Send + 'static,
100    {
101        fn inner<R: Send + 'static>(
102            executor: &scheduler::BackgroundExecutor,
103            future: AnyFuture<R>,
104        ) -> Task<R> {
105            Task(executor.spawn(future))
106        }
107
108        inner(&self.0, Box::pin(future))
109    }
110
111    /// Scoped lets you start a number of tasks and waits
112    /// for all of them to complete before returning.
113    pub async fn scoped<'scope, F>(&self, f: F)
114    where
115        F: FnOnce(&mut Scope<'scope>),
116    {
117        let mut scope = Scope::new(self.clone());
118        (f)(&mut scope);
119        let spawned = mem::take(&mut scope.futures)
120            .into_iter()
121            .map(|f| self.spawn(f))
122            .collect::<Vec<_>>();
123        for task in spawned {
124            task.await;
125        }
126    }
127
128    /// Get the current time.
129    ///
130    /// Calling this instead of `std::time::Instant::now` allows the use
131    /// of fake timers in tests.
132    pub fn now(&self) -> Instant {
133        self.scheduler().clock().now()
134    }
135
136    /// Returns a task that will complete after the given duration.
137    /// Depending on other concurrent tasks the elapsed duration may be longer
138    /// than requested.
139    pub fn timer(&self, duration: Duration) -> Timer {
140        self.0.timer(duration)
141    }
142
143    /// Get the underlying scheduler.
144    pub fn scheduler(&self) -> &Arc<dyn Scheduler> {
145        &self.0.scheduler()
146    }
147
148    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
149    #[cfg(any(test, feature = "test-support"))]
150    pub fn simulate_random_delay(&self) -> Yield {
151        self.scheduler().as_test().yield_random()
152    }
153
154    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
155    #[cfg(any(test, feature = "test-support"))]
156    pub fn advance_clock(&self, duration: Duration) {
157        self.scheduler().as_test().advance_clock(duration);
158    }
159
160    /// in tests, run all tasks that are ready to run. If after doing so
161    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
162    #[cfg(any(test, feature = "test-support"))]
163    pub fn run_until_parked(&self) {
164        self.scheduler().as_test().run();
165    }
166
167    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
168    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
169    /// do take real async time to run.
170    #[cfg(any(test, feature = "test-support"))]
171    pub fn allow_parking(&self) {
172        self.scheduler().as_test().allow_parking();
173    }
174
175    /// undoes the effect of [`Self::allow_parking`].
176    #[cfg(any(test, feature = "test-support"))]
177    pub fn forbid_parking(&self) {
178        self.scheduler().as_test().forbid_parking();
179    }
180
181    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
182    #[cfg(any(test, feature = "test-support"))]
183    pub fn rng(&self) -> StdRng {
184        self.scheduler().as_test().rng().lock().clone()
185    }
186
187    /// How many CPUs are available for this executor.
188    pub fn num_cpus(&self) -> usize {
189        #[cfg(any(test, feature = "test-support"))]
190        return 4;
191
192        #[cfg(not(any(test, feature = "test-support")))]
193        return num_cpus::get();
194    }
195
196    #[cfg(any(test, feature = "test-support"))]
197    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
198    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
199        self.scheduler().as_test().set_timeout_ticks(range)
200    }
201}
202
203/// ForegroundExecutor runs things on the main thread.
204impl ForegroundExecutor {
205    /// Create a new ForegroundExecutor
206    pub fn new(executor: scheduler::ForegroundExecutor) -> Self {
207        Self(executor)
208    }
209
210    /// Enqueues the given Task to run on the main thread at some point in the future.
211    #[track_caller]
212    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
213    where
214        R: 'static,
215    {
216        #[track_caller]
217        fn inner<R: 'static>(
218            executor: &scheduler::ForegroundExecutor,
219            future: AnyLocalFuture<R>,
220        ) -> Task<R> {
221            Task(executor.spawn(future))
222        }
223
224        inner::<R>(&self.0, Box::pin(future))
225    }
226
227    /// Block the current thread until the given future resolves.
228    /// Consider using `block_with_timeout` instead.
229    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
230        self.0.block_on(future)
231    }
232
233    /// Block the current thread until the given future resolves
234    /// or `timeout` has elapsed.
235    pub fn block_with_timeout<Fut: Unpin + Future>(
236        &self,
237        timeout: Duration,
238        future: Fut,
239    ) -> Result<Fut::Output, Fut> {
240        self.0.block_with_timeout(timeout, future)
241    }
242}
243
244/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
245pub struct Scope<'a> {
246    executor: BackgroundExecutor,
247    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
248    tx: Option<mpsc::Sender<()>>,
249    rx: mpsc::Receiver<()>,
250    lifetime: PhantomData<&'a ()>,
251}
252
253impl<'a> Scope<'a> {
254    fn new(executor: BackgroundExecutor) -> Self {
255        let (tx, rx) = mpsc::channel(1);
256        Self {
257            executor,
258            tx: Some(tx),
259            rx,
260            futures: Default::default(),
261            lifetime: PhantomData,
262        }
263    }
264
265    /// How many CPUs are available to the dispatcher.
266    pub fn num_cpus(&self) -> usize {
267        self.executor.num_cpus()
268    }
269
270    /// Spawn a future into this scope.
271    pub fn spawn<F>(&mut self, f: F)
272    where
273        F: Future<Output = ()> + Send + 'a,
274    {
275        let tx = self.tx.clone().unwrap();
276
277        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
278        // dropping this `Scope` blocks until all of the futures have resolved.
279        let f = unsafe {
280            mem::transmute::<
281                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
282                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
283            >(Box::pin(async move {
284                f.await;
285                drop(tx);
286            }))
287        };
288        self.futures.push(f);
289    }
290}
291
292impl Drop for Scope<'_> {
293    fn drop(&mut self) {
294        self.tx.take().unwrap();
295
296        // Wait until the channel is closed, which means that all of the spawned
297        // futures have resolved.
298        self.executor.scheduler().block(
299            None,
300            async {
301                self.rx.next().await;
302            }
303            .boxed(),
304            None,
305        );
306    }
307}