1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::TryFutureExt;
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{
8 fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
9 time::Duration,
10};
11
12pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
13
14/// A pointer to the executor that is currently running,
15/// for spawning background tasks.
16#[derive(Clone)]
17pub struct BackgroundExecutor {
18 inner: scheduler::BackgroundExecutor,
19 dispatcher: Arc<dyn PlatformDispatcher>,
20}
21
22/// A pointer to the executor that is currently running,
23/// for spawning tasks on the main thread.
24#[derive(Clone)]
25pub struct ForegroundExecutor {
26 inner: scheduler::ForegroundExecutor,
27 dispatcher: Arc<dyn PlatformDispatcher>,
28 not_send: PhantomData<Rc<()>>,
29}
30
31/// Task is a primitive that allows work to happen in the background.
32///
33/// It implements [`Future`] so you can `.await` on it.
34///
35/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
36/// the task to continue running, but with no way to return a value.
37#[must_use]
38#[derive(Debug)]
39pub struct Task<T>(scheduler::Task<T>);
40
41impl<T> Task<T> {
42 /// Creates a new task that will resolve with the value.
43 pub fn ready(val: T) -> Self {
44 Task(scheduler::Task::ready(val))
45 }
46
47 /// Returns true if the task has completed or was created with `Task::ready`.
48 pub fn is_ready(&self) -> bool {
49 self.0.is_ready()
50 }
51
52 /// Detaching a task runs it to completion in the background.
53 pub fn detach(self) {
54 self.0.detach()
55 }
56
57 /// Wraps a scheduler::Task.
58 pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
59 Task(task)
60 }
61
62 /// Converts this task into a fallible task that returns `Option<T>`.
63 ///
64 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
65 /// if the task was cancelled.
66 ///
67 /// # Example
68 ///
69 /// ```ignore
70 /// // Background task that gracefully handles cancellation:
71 /// cx.background_spawn(async move {
72 /// let result = foreground_task.fallible().await;
73 /// if let Some(value) = result {
74 /// // Process the value
75 /// }
76 /// // If None, task was cancelled - just exit gracefully
77 /// }).detach();
78 /// ```
79 pub fn fallible(self) -> FallibleTask<T> {
80 self.0.fallible()
81 }
82}
83
84impl<T, E> Task<Result<T, E>>
85where
86 T: 'static,
87 E: 'static + Debug,
88{
89 /// Run the task to completion in the background and log any errors that occur.
90 #[track_caller]
91 pub fn detach_and_log_err(self, cx: &App) {
92 let location = core::panic::Location::caller();
93 cx.foreground_executor()
94 .spawn(self.log_tracked_err(*location))
95 .detach();
96 }
97}
98
99impl<T> std::future::Future for Task<T> {
100 type Output = T;
101
102 fn poll(
103 self: std::pin::Pin<&mut Self>,
104 cx: &mut std::task::Context<'_>,
105 ) -> std::task::Poll<Self::Output> {
106 // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
107 // and we're just projecting the pin through to the inner task.
108 let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
109 inner.poll(cx)
110 }
111}
112
113impl BackgroundExecutor {
114 /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
115 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
116 #[cfg(any(test, feature = "test-support"))]
117 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
118 test_dispatcher.scheduler().clone()
119 } else {
120 Arc::new(PlatformScheduler::new(dispatcher.clone()))
121 };
122
123 #[cfg(not(any(test, feature = "test-support")))]
124 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
125
126 Self {
127 inner: scheduler::BackgroundExecutor::new(scheduler),
128 dispatcher,
129 }
130 }
131
132 /// Enqueues the given future to be run to completion on a background thread.
133 #[track_caller]
134 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
135 where
136 R: Send + 'static,
137 {
138 self.spawn_with_priority(Priority::default(), future.boxed())
139 }
140
141 /// Enqueues the given future to be run to completion on a background thread with the given priority.
142 ///
143 /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
144 /// realtime scheduling priority, suitable for audio processing.
145 #[track_caller]
146 pub fn spawn_with_priority<R>(
147 &self,
148 priority: Priority,
149 future: impl Future<Output = R> + Send + 'static,
150 ) -> Task<R>
151 where
152 R: Send + 'static,
153 {
154 if priority == Priority::RealtimeAudio {
155 Task::from_scheduler(self.inner.spawn_realtime(future))
156 } else {
157 Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
158 }
159 }
160
161 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
162 ///
163 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
164 /// completion before the current task is resumed, even if the current task is slated for cancellation.
165 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
166 where
167 R: Send,
168 {
169 use crate::RunnableMeta;
170 use parking_lot::{Condvar, Mutex};
171
172 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
173
174 impl Drop for NotifyOnDrop<'_> {
175 fn drop(&mut self) {
176 *self.0.1.lock() = true;
177 self.0.0.notify_all();
178 }
179 }
180
181 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
182
183 impl Drop for WaitOnDrop<'_> {
184 fn drop(&mut self) {
185 let mut done = self.0.1.lock();
186 if !*done {
187 self.0.0.wait(&mut done);
188 }
189 }
190 }
191
192 let dispatcher = self.dispatcher.clone();
193 let location = core::panic::Location::caller();
194
195 let pair = &(Condvar::new(), Mutex::new(false));
196 let _wait_guard = WaitOnDrop(pair);
197
198 let (runnable, task) = unsafe {
199 async_task::Builder::new()
200 .metadata(RunnableMeta { location })
201 .spawn_unchecked(
202 move |_| async {
203 let _notify_guard = NotifyOnDrop(pair);
204 future.await
205 },
206 move |runnable| {
207 dispatcher.dispatch(runnable, Priority::default());
208 },
209 )
210 };
211 runnable.schedule();
212 task.await
213 }
214
215 /// Scoped lets you start a number of tasks and waits
216 /// for all of them to complete before returning.
217 pub async fn scoped<'scope, F>(&self, scheduler: F)
218 where
219 F: FnOnce(&mut Scope<'scope>),
220 {
221 let mut scope = Scope::new(self.clone(), Priority::default());
222 (scheduler)(&mut scope);
223 let spawned = mem::take(&mut scope.futures)
224 .into_iter()
225 .map(|f| self.spawn_with_priority(scope.priority, f))
226 .collect::<Vec<_>>();
227 for task in spawned {
228 task.await;
229 }
230 }
231
232 /// Scoped lets you start a number of tasks and waits
233 /// for all of them to complete before returning.
234 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
235 where
236 F: FnOnce(&mut Scope<'scope>),
237 {
238 let mut scope = Scope::new(self.clone(), priority);
239 (scheduler)(&mut scope);
240 let spawned = mem::take(&mut scope.futures)
241 .into_iter()
242 .map(|f| self.spawn_with_priority(scope.priority, f))
243 .collect::<Vec<_>>();
244 for task in spawned {
245 task.await;
246 }
247 }
248
249 /// Get the current time.
250 ///
251 /// Calling this instead of `std::time::Instant::now` allows the use
252 /// of fake timers in tests.
253 pub fn now(&self) -> Instant {
254 self.inner.scheduler().clock().now()
255 }
256
257 /// Returns a task that will complete after the given duration.
258 /// Depending on other concurrent tasks the elapsed duration may be longer
259 /// than requested.
260 #[track_caller]
261 pub fn timer(&self, duration: Duration) -> Task<()> {
262 if duration.is_zero() {
263 return Task::ready(());
264 }
265 self.spawn(self.inner.scheduler().timer(duration))
266 }
267
268 /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
269 #[cfg(any(test, feature = "test-support"))]
270 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
271 self.dispatcher.as_test().unwrap().simulate_random_delay()
272 }
273
274 /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
275 #[cfg(any(test, feature = "test-support"))]
276 pub fn advance_clock(&self, duration: Duration) {
277 self.dispatcher.as_test().unwrap().advance_clock(duration)
278 }
279
280 /// In tests, run one task.
281 #[cfg(any(test, feature = "test-support"))]
282 pub fn tick(&self) -> bool {
283 self.dispatcher.as_test().unwrap().scheduler().tick()
284 }
285
286 /// In tests, run tasks until the scheduler would park.
287 ///
288 /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
289 /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
290 /// drained. To preserve the historical semantics that tests relied on (drain all work that can
291 /// make progress), we advance the clock to the next timer when no runnable tasks remain.
292 #[cfg(any(test, feature = "test-support"))]
293 pub fn run_until_parked(&self) {
294 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
295 scheduler.run();
296 }
297
298 /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
299 #[cfg(any(test, feature = "test-support"))]
300 pub fn allow_parking(&self) {
301 self.dispatcher
302 .as_test()
303 .unwrap()
304 .scheduler()
305 .allow_parking();
306
307 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
308 log::warn!("[gpui::executor] allow_parking: enabled");
309 }
310 }
311
312 /// Sets the range of ticks to run before timing out in block_on.
313 #[cfg(any(test, feature = "test-support"))]
314 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
315 self.dispatcher
316 .as_test()
317 .unwrap()
318 .scheduler()
319 .set_timeout_ticks(range);
320 }
321
322 /// Undoes the effect of [`Self::allow_parking`].
323 #[cfg(any(test, feature = "test-support"))]
324 pub fn forbid_parking(&self) {
325 self.dispatcher
326 .as_test()
327 .unwrap()
328 .scheduler()
329 .forbid_parking();
330 }
331
332 /// In tests, returns the rng used by the dispatcher.
333 #[cfg(any(test, feature = "test-support"))]
334 pub fn rng(&self) -> scheduler::SharedRng {
335 self.dispatcher.as_test().unwrap().scheduler().rng()
336 }
337
338 /// How many CPUs are available to the dispatcher.
339 pub fn num_cpus(&self) -> usize {
340 #[cfg(any(test, feature = "test-support"))]
341 if let Some(test) = self.dispatcher.as_test() {
342 return test.num_cpus_override().unwrap_or(4);
343 }
344 num_cpus::get()
345 }
346
347 /// Override the number of CPUs reported by this executor in tests.
348 /// Panics if not called on a test executor.
349 #[cfg(any(test, feature = "test-support"))]
350 pub fn set_num_cpus(&self, count: usize) {
351 self.dispatcher
352 .as_test()
353 .expect("set_num_cpus can only be called on a test executor")
354 .set_num_cpus(count);
355 }
356
357 /// Whether we're on the main thread.
358 pub fn is_main_thread(&self) -> bool {
359 self.dispatcher.is_main_thread()
360 }
361
362 #[doc(hidden)]
363 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
364 &self.dispatcher
365 }
366}
367
368impl ForegroundExecutor {
369 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
370 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
371 #[cfg(any(test, feature = "test-support"))]
372 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
373 if let Some(test_dispatcher) = dispatcher.as_test() {
374 (
375 test_dispatcher.scheduler().clone(),
376 test_dispatcher.session_id(),
377 )
378 } else {
379 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
380 let session_id = platform_scheduler.allocate_session_id();
381 (platform_scheduler, session_id)
382 };
383
384 #[cfg(not(any(test, feature = "test-support")))]
385 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
386 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
387 let session_id = platform_scheduler.allocate_session_id();
388 (platform_scheduler, session_id)
389 };
390
391 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
392
393 Self {
394 inner,
395 dispatcher,
396 not_send: PhantomData,
397 }
398 }
399
400 /// Enqueues the given Task to run on the main thread.
401 #[track_caller]
402 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
403 where
404 R: 'static,
405 {
406 Task::from_scheduler(self.inner.spawn(future.boxed_local()))
407 }
408
409 /// Enqueues the given Task to run on the main thread with the given priority.
410 #[track_caller]
411 pub fn spawn_with_priority<R>(
412 &self,
413 _priority: Priority,
414 future: impl Future<Output = R> + 'static,
415 ) -> Task<R>
416 where
417 R: 'static,
418 {
419 // Priority is ignored for foreground tasks - they run in order on the main thread
420 Task::from_scheduler(self.inner.spawn(future))
421 }
422
423 /// Used by the test harness to run an async test in a synchronous fashion.
424 #[cfg(any(test, feature = "test-support"))]
425 #[track_caller]
426 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
427 use std::cell::Cell;
428
429 let scheduler = self.inner.scheduler();
430
431 let output = Cell::new(None);
432 let future = async {
433 output.set(Some(future.await));
434 };
435 let mut future = std::pin::pin!(future);
436
437 // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
438 // (which are associated with the test session) to make progress while we block.
439 // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
440 scheduler.block(None, future.as_mut(), None);
441
442 output.take().expect("block_test future did not complete")
443 }
444
445 /// Block the current thread until the given future resolves.
446 /// Consider using `block_with_timeout` instead.
447 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
448 self.inner.block_on(future)
449 }
450
451 /// Block the current thread until the given future resolves or the timeout elapses.
452 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
453 &self,
454 duration: Duration,
455 future: Fut,
456 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
457 self.inner.block_with_timeout(duration, future)
458 }
459
460 #[doc(hidden)]
461 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
462 &self.dispatcher
463 }
464
465 #[doc(hidden)]
466 pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
467 self.inner.clone()
468 }
469}
470
471/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
472pub struct Scope<'a> {
473 executor: BackgroundExecutor,
474 priority: Priority,
475 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
476 tx: Option<mpsc::Sender<()>>,
477 rx: mpsc::Receiver<()>,
478 lifetime: PhantomData<&'a ()>,
479}
480
481impl<'a> Scope<'a> {
482 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
483 let (tx, rx) = mpsc::channel(1);
484 Self {
485 executor,
486 priority,
487 tx: Some(tx),
488 rx,
489 futures: Default::default(),
490 lifetime: PhantomData,
491 }
492 }
493
494 /// How many CPUs are available to the dispatcher.
495 pub fn num_cpus(&self) -> usize {
496 self.executor.num_cpus()
497 }
498
499 /// Spawn a future into this scope.
500 #[track_caller]
501 pub fn spawn<F>(&mut self, f: F)
502 where
503 F: Future<Output = ()> + Send + 'a,
504 {
505 let tx = self.tx.clone().unwrap();
506
507 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
508 // dropping this `Scope` blocks until all of the futures have resolved.
509 let f = unsafe {
510 mem::transmute::<
511 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
512 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
513 >(Box::pin(async move {
514 f.await;
515 drop(tx);
516 }))
517 };
518 self.futures.push(f);
519 }
520}
521
522impl Drop for Scope<'_> {
523 fn drop(&mut self) {
524 self.tx.take().unwrap();
525
526 // Wait until the channel is closed, which means that all of the spawned
527 // futures have resolved.
528 let future = async {
529 self.rx.next().await;
530 };
531 let mut future = std::pin::pin!(future);
532 self.executor
533 .inner
534 .scheduler()
535 .block(None, future.as_mut(), None);
536 }
537}
538
539#[cfg(test)]
540mod test {
541 use super::*;
542 use crate::{App, TestDispatcher, TestPlatform};
543 use std::cell::RefCell;
544
545 /// Helper to create test infrastructure.
546 /// Returns (dispatcher, background_executor, app).
547 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
548 let dispatcher = TestDispatcher::new(0);
549 let arc_dispatcher = Arc::new(dispatcher.clone());
550 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
551 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
552
553 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
554 let asset_source = Arc::new(());
555 let http_client = http_client::FakeHttpClient::with_404_response();
556
557 let app = App::new_app(platform, asset_source, http_client);
558 (dispatcher, background_executor, app)
559 }
560
561 #[test]
562 fn sanity_test_tasks_run() {
563 let (dispatcher, _background_executor, app) = create_test_app();
564 let foreground_executor = app.borrow().foreground_executor.clone();
565
566 let task_ran = Rc::new(RefCell::new(false));
567
568 foreground_executor
569 .spawn({
570 let task_ran = Rc::clone(&task_ran);
571 async move {
572 *task_ran.borrow_mut() = true;
573 }
574 })
575 .detach();
576
577 // Run dispatcher while app is still alive
578 dispatcher.run_until_parked();
579
580 // Task should have run
581 assert!(
582 *task_ran.borrow(),
583 "Task should run normally when app is alive"
584 );
585 }
586}