executor.rs

  1use crate::{AppContext, PlatformDispatcher};
  2use futures::channel::mpsc;
  3use smol::prelude::*;
  4use std::{
  5    fmt::Debug,
  6    marker::PhantomData,
  7    mem,
  8    num::NonZeroUsize,
  9    pin::Pin,
 10    rc::Rc,
 11    sync::{
 12        atomic::{AtomicUsize, Ordering::SeqCst},
 13        Arc,
 14    },
 15    task::{Context, Poll},
 16    time::Duration,
 17};
 18use util::TryFutureExt;
 19use waker_fn::waker_fn;
 20
 21#[cfg(any(test, feature = "test-support"))]
 22use rand::rngs::StdRng;
 23
 24/// A pointer to the executor that is currently running,
 25/// for spawning background tasks.
 26#[derive(Clone)]
 27pub struct BackgroundExecutor {
 28    dispatcher: Arc<dyn PlatformDispatcher>,
 29}
 30
 31/// A pointer to the executor that is currently running,
 32/// for spawning tasks on the main thread.
 33#[derive(Clone)]
 34pub struct ForegroundExecutor {
 35    dispatcher: Arc<dyn PlatformDispatcher>,
 36    not_send: PhantomData<Rc<()>>,
 37}
 38
 39/// Task is a primitive that allows work to happen in the background.
 40///
 41/// It implements [`Future`] so you can `.await` on it.
 42///
 43/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 44/// the task to continue running, but with no way to return a value.
 45#[must_use]
 46#[derive(Debug)]
 47pub enum Task<T> {
 48    /// A task that is ready to return a value
 49    Ready(Option<T>),
 50
 51    /// A task that is currently running.
 52    Spawned(async_task::Task<T>),
 53}
 54
 55impl<T> Task<T> {
 56    /// Creates a new task that will resolve with the value
 57    pub fn ready(val: T) -> Self {
 58        Task::Ready(Some(val))
 59    }
 60
 61    /// Detaching a task runs it to completion in the background
 62    pub fn detach(self) {
 63        match self {
 64            Task::Ready(_) => {}
 65            Task::Spawned(task) => task.detach(),
 66        }
 67    }
 68}
 69
 70impl<E, T> Task<Result<T, E>>
 71where
 72    T: 'static,
 73    E: 'static + Debug,
 74{
 75    /// Run the task to completion in the background and log any
 76    /// errors that occur.
 77    #[track_caller]
 78    pub fn detach_and_log_err(self, cx: &AppContext) {
 79        let location = core::panic::Location::caller();
 80        cx.foreground_executor()
 81            .spawn(self.log_tracked_err(*location))
 82            .detach();
 83    }
 84}
 85
 86impl<T> Future for Task<T> {
 87    type Output = T;
 88
 89    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 90        match unsafe { self.get_unchecked_mut() } {
 91            Task::Ready(val) => Poll::Ready(val.take().unwrap()),
 92            Task::Spawned(task) => task.poll(cx),
 93        }
 94    }
 95}
 96
 97/// A task label is an opaque identifier that you can use to
 98/// refer to a task in tests.
 99#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
100pub struct TaskLabel(NonZeroUsize);
101
102impl Default for TaskLabel {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108impl TaskLabel {
109    /// Construct a new task label.
110    pub fn new() -> Self {
111        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
112        Self(NEXT_TASK_LABEL.fetch_add(1, SeqCst).try_into().unwrap())
113    }
114}
115
116type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
117
118type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
119
120/// BackgroundExecutor lets you run things on background threads.
121/// In production this is a thread pool with no ordering guarantees.
122/// In tests this is simulated by running tasks one by one in a deterministic
123/// (but arbitrary) order controlled by the `SEED` environment variable.
124impl BackgroundExecutor {
125    #[doc(hidden)]
126    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
127        Self { dispatcher }
128    }
129
130    /// Enqueues the given future to be run to completion on a background thread.
131    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
132    where
133        R: Send + 'static,
134    {
135        self.spawn_internal::<R>(Box::pin(future), None)
136    }
137
138    /// Enqueues the given future to be run to completion on a background thread.
139    /// The given label can be used to control the priority of the task in tests.
140    pub fn spawn_labeled<R>(
141        &self,
142        label: TaskLabel,
143        future: impl Future<Output = R> + Send + 'static,
144    ) -> Task<R>
145    where
146        R: Send + 'static,
147    {
148        self.spawn_internal::<R>(Box::pin(future), Some(label))
149    }
150
151    fn spawn_internal<R: Send + 'static>(
152        &self,
153        future: AnyFuture<R>,
154        label: Option<TaskLabel>,
155    ) -> Task<R> {
156        let dispatcher = self.dispatcher.clone();
157        let (runnable, task) =
158            async_task::spawn(future, move |runnable| dispatcher.dispatch(runnable, label));
159        runnable.schedule();
160        Task::Spawned(task)
161    }
162
163    /// Used by the test harness to run an async test in a synchronous fashion.
164    #[cfg(any(test, feature = "test-support"))]
165    #[track_caller]
166    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
167        if let Ok(value) = self.block_internal(false, future, None) {
168            value
169        } else {
170            unreachable!()
171        }
172    }
173
174    /// Block the current thread until the given future resolves.
175    /// Consider using `block_with_timeout` instead.
176    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
177        if let Ok(value) = self.block_internal(true, future, None) {
178            value
179        } else {
180            unreachable!()
181        }
182    }
183
184    #[cfg(not(any(test, feature = "test-support")))]
185    pub(crate) fn block_internal<R>(
186        &self,
187        _background_only: bool,
188        future: impl Future<Output = R>,
189        timeout: Option<Duration>,
190    ) -> Result<R, impl Future<Output = R>> {
191        use std::time::Instant;
192
193        let mut future = Box::pin(future);
194        if timeout == Some(Duration::ZERO) {
195            return Err(future);
196        }
197        let deadline = timeout.map(|timeout| Instant::now() + timeout);
198
199        let unparker = self.dispatcher.unparker();
200        let waker = waker_fn(move || {
201            unparker.unpark();
202        });
203        let mut cx = std::task::Context::from_waker(&waker);
204
205        loop {
206            match future.as_mut().poll(&mut cx) {
207                Poll::Ready(result) => return Ok(result),
208                Poll::Pending => {
209                    let timeout =
210                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
211                    if !self.dispatcher.park(timeout) {
212                        if deadline.is_some_and(|deadline| deadline < Instant::now()) {
213                            return Err(future);
214                        }
215                    }
216                }
217            }
218        }
219    }
220
221    #[cfg(any(test, feature = "test-support"))]
222    #[track_caller]
223    pub(crate) fn block_internal<R>(
224        &self,
225        background_only: bool,
226        future: impl Future<Output = R>,
227        timeout: Option<Duration>,
228    ) -> Result<R, impl Future<Output = R>> {
229        use std::sync::atomic::AtomicBool;
230
231        let mut future = Box::pin(future);
232        if timeout == Some(Duration::ZERO) {
233            return Err(future);
234        }
235        let Some(dispatcher) = self.dispatcher.as_test() else {
236            return Err(future);
237        };
238
239        let mut max_ticks = if timeout.is_some() {
240            dispatcher.gen_block_on_ticks()
241        } else {
242            usize::MAX
243        };
244        let unparker = self.dispatcher.unparker();
245        let awoken = Arc::new(AtomicBool::new(false));
246        let waker = waker_fn({
247            let awoken = awoken.clone();
248            move || {
249                awoken.store(true, SeqCst);
250                unparker.unpark();
251            }
252        });
253        let mut cx = std::task::Context::from_waker(&waker);
254
255        loop {
256            match future.as_mut().poll(&mut cx) {
257                Poll::Ready(result) => return Ok(result),
258                Poll::Pending => {
259                    if max_ticks == 0 {
260                        return Err(future);
261                    }
262                    max_ticks -= 1;
263
264                    if !dispatcher.tick(background_only) {
265                        if awoken.swap(false, SeqCst) {
266                            continue;
267                        }
268
269                        if !dispatcher.parking_allowed() {
270                            let mut backtrace_message = String::new();
271                            let mut waiting_message = String::new();
272                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
273                                backtrace_message =
274                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
275                            }
276                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
277                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
278                            }
279                            panic!(
280                                    "parked with nothing left to run{waiting_message}{backtrace_message}",
281                                )
282                        }
283                        self.dispatcher.park(None);
284                    }
285                }
286            }
287        }
288    }
289
290    /// Block the current thread until the given future resolves
291    /// or `duration` has elapsed.
292    pub fn block_with_timeout<R>(
293        &self,
294        duration: Duration,
295        future: impl Future<Output = R>,
296    ) -> Result<R, impl Future<Output = R>> {
297        self.block_internal(true, future, Some(duration))
298    }
299
300    /// Scoped lets you start a number of tasks and waits
301    /// for all of them to complete before returning.
302    pub async fn scoped<'scope, F>(&self, scheduler: F)
303    where
304        F: FnOnce(&mut Scope<'scope>),
305    {
306        let mut scope = Scope::new(self.clone());
307        (scheduler)(&mut scope);
308        let spawned = mem::take(&mut scope.futures)
309            .into_iter()
310            .map(|f| self.spawn(f))
311            .collect::<Vec<_>>();
312        for task in spawned {
313            task.await;
314        }
315    }
316
317    /// Returns a task that will complete after the given duration.
318    /// Depending on other concurrent tasks the elapsed duration may be longer
319    /// than requested.
320    pub fn timer(&self, duration: Duration) -> Task<()> {
321        let (runnable, task) = async_task::spawn(async move {}, {
322            let dispatcher = self.dispatcher.clone();
323            move |runnable| dispatcher.dispatch_after(duration, runnable)
324        });
325        runnable.schedule();
326        Task::Spawned(task)
327    }
328
329    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
330    #[cfg(any(test, feature = "test-support"))]
331    pub fn start_waiting(&self) {
332        self.dispatcher.as_test().unwrap().start_waiting();
333    }
334
335    /// in tests, removes the debugging data added by start_waiting
336    #[cfg(any(test, feature = "test-support"))]
337    pub fn finish_waiting(&self) {
338        self.dispatcher.as_test().unwrap().finish_waiting();
339    }
340
341    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
342    #[cfg(any(test, feature = "test-support"))]
343    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> {
344        self.dispatcher.as_test().unwrap().simulate_random_delay()
345    }
346
347    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
348    #[cfg(any(test, feature = "test-support"))]
349    pub fn deprioritize(&self, task_label: TaskLabel) {
350        self.dispatcher.as_test().unwrap().deprioritize(task_label)
351    }
352
353    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
354    #[cfg(any(test, feature = "test-support"))]
355    pub fn advance_clock(&self, duration: Duration) {
356        self.dispatcher.as_test().unwrap().advance_clock(duration)
357    }
358
359    /// in tests, run one task.
360    #[cfg(any(test, feature = "test-support"))]
361    pub fn tick(&self) -> bool {
362        self.dispatcher.as_test().unwrap().tick(false)
363    }
364
365    /// in tests, run all tasks that are ready to run. If after doing so
366    /// the test still has outstanding tasks, this will panic. (See also `allow_parking`)
367    #[cfg(any(test, feature = "test-support"))]
368    pub fn run_until_parked(&self) {
369        self.dispatcher.as_test().unwrap().run_until_parked()
370    }
371
372    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
373    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
374    /// do take real async time to run.
375    #[cfg(any(test, feature = "test-support"))]
376    pub fn allow_parking(&self) {
377        self.dispatcher.as_test().unwrap().allow_parking();
378    }
379
380    /// undoes the effect of [`allow_parking`].
381    #[cfg(any(test, feature = "test-support"))]
382    pub fn forbid_parking(&self) {
383        self.dispatcher.as_test().unwrap().forbid_parking();
384    }
385
386    /// adds detail to the "parked with nothing let to run" message.
387    #[cfg(any(test, feature = "test-support"))]
388    pub fn set_waiting_hint(&self, msg: Option<String>) {
389        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
390    }
391
392    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
393    #[cfg(any(test, feature = "test-support"))]
394    pub fn rng(&self) -> StdRng {
395        self.dispatcher.as_test().unwrap().rng()
396    }
397
398    /// How many CPUs are available to the dispatcher.
399    pub fn num_cpus(&self) -> usize {
400        num_cpus::get()
401    }
402
403    /// Whether we're on the main thread.
404    pub fn is_main_thread(&self) -> bool {
405        self.dispatcher.is_main_thread()
406    }
407
408    #[cfg(any(test, feature = "test-support"))]
409    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
410    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
411        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
412    }
413}
414
415/// ForegroundExecutor runs things on the main thread.
416impl ForegroundExecutor {
417    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
418    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
419        Self {
420            dispatcher,
421            not_send: PhantomData,
422        }
423    }
424
425    /// Enqueues the given Task to run on the main thread at some point in the future.
426    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
427    where
428        R: 'static,
429    {
430        let dispatcher = self.dispatcher.clone();
431        fn inner<R: 'static>(
432            dispatcher: Arc<dyn PlatformDispatcher>,
433            future: AnyLocalFuture<R>,
434        ) -> Task<R> {
435            let (runnable, task) = async_task::spawn_local(future, move |runnable| {
436                dispatcher.dispatch_on_main_thread(runnable)
437            });
438            runnable.schedule();
439            Task::Spawned(task)
440        }
441        inner::<R>(dispatcher, Box::pin(future))
442    }
443}
444
445/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
446pub struct Scope<'a> {
447    executor: BackgroundExecutor,
448    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
449    tx: Option<mpsc::Sender<()>>,
450    rx: mpsc::Receiver<()>,
451    lifetime: PhantomData<&'a ()>,
452}
453
454impl<'a> Scope<'a> {
455    fn new(executor: BackgroundExecutor) -> Self {
456        let (tx, rx) = mpsc::channel(1);
457        Self {
458            executor,
459            tx: Some(tx),
460            rx,
461            futures: Default::default(),
462            lifetime: PhantomData,
463        }
464    }
465
466    /// How many CPUs are available to the dispatcher.
467    pub fn num_cpus(&self) -> usize {
468        self.executor.num_cpus()
469    }
470
471    /// Spawn a future into this scope.
472    pub fn spawn<F>(&mut self, f: F)
473    where
474        F: Future<Output = ()> + Send + 'a,
475    {
476        let tx = self.tx.clone().unwrap();
477
478        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
479        // dropping this `Scope` blocks until all of the futures have resolved.
480        let f = unsafe {
481            mem::transmute::<
482                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
483                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
484            >(Box::pin(async move {
485                f.await;
486                drop(tx);
487            }))
488        };
489        self.futures.push(f);
490    }
491}
492
493impl<'a> Drop for Scope<'a> {
494    fn drop(&mut self) {
495        self.tx.take().unwrap();
496
497        // Wait until the channel is closed, which means that all of the spawned
498        // futures have resolved.
499        self.executor.block(self.rx.next());
500    }
501}