1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::{TryFutureExt, TryFutureExtBacktrace};
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{
8 fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
9 time::Duration,
10};
11
12pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
13
14/// A pointer to the executor that is currently running,
15/// for spawning background tasks.
16#[derive(Clone)]
17pub struct BackgroundExecutor {
18 inner: scheduler::BackgroundExecutor,
19 dispatcher: Arc<dyn PlatformDispatcher>,
20}
21
22/// A pointer to the executor that is currently running,
23/// for spawning tasks on the main thread.
24#[derive(Clone)]
25pub struct ForegroundExecutor {
26 inner: scheduler::ForegroundExecutor,
27 dispatcher: Arc<dyn PlatformDispatcher>,
28 not_send: PhantomData<Rc<()>>,
29}
30
31/// Task is a primitive that allows work to happen in the background.
32///
33/// It implements [`Future`] so you can `.await` on it.
34///
35/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
36/// the task to continue running, but with no way to return a value.
37#[must_use]
38#[derive(Debug)]
39pub struct Task<T>(scheduler::Task<T>);
40
41impl<T> Task<T> {
42 /// Creates a new task that will resolve with the value.
43 pub fn ready(val: T) -> Self {
44 Task(scheduler::Task::ready(val))
45 }
46
47 /// Returns true if the task has completed or was created with `Task::ready`.
48 pub fn is_ready(&self) -> bool {
49 self.0.is_ready()
50 }
51
52 /// Detaching a task runs it to completion in the background.
53 pub fn detach(self) {
54 self.0.detach()
55 }
56
57 /// Wraps a scheduler::Task.
58 pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
59 Task(task)
60 }
61
62 /// Converts this task into a fallible task that returns `Option<T>`.
63 ///
64 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
65 /// if the task was cancelled.
66 ///
67 /// # Example
68 ///
69 /// ```ignore
70 /// // Background task that gracefully handles cancellation:
71 /// cx.background_spawn(async move {
72 /// let result = foreground_task.fallible().await;
73 /// if let Some(value) = result {
74 /// // Process the value
75 /// }
76 /// // If None, task was cancelled - just exit gracefully
77 /// }).detach();
78 /// ```
79 pub fn fallible(self) -> FallibleTask<T> {
80 self.0.fallible()
81 }
82}
83
84impl<T, E> Task<Result<T, E>>
85where
86 T: 'static,
87 E: 'static + std::fmt::Display,
88{
89 /// Run the task to completion in the background and log any errors that occur.
90 #[track_caller]
91 pub fn detach_and_log_err(self, cx: &App) {
92 let location = core::panic::Location::caller();
93 cx.foreground_executor()
94 .spawn(self.log_tracked_err(*location))
95 .detach();
96 }
97}
98
99impl<T, E> Task<Result<T, E>>
100where
101 T: 'static,
102 E: 'static + std::fmt::Debug,
103{
104 /// Like [`Self::detach_and_log_err`], but uses `{:?}` formatting on failure so `anyhow::Error`
105 /// values emit their full backtrace. Prefer `detach_and_log_err` unless a backtrace is wanted.
106 #[track_caller]
107 pub fn detach_and_log_err_with_backtrace(self, cx: &App) {
108 let location = *core::panic::Location::caller();
109 cx.foreground_executor()
110 .spawn(self.log_tracked_err_with_backtrace(location))
111 .detach();
112 }
113}
114
115impl<T> std::future::Future for Task<T> {
116 type Output = T;
117
118 fn poll(
119 self: std::pin::Pin<&mut Self>,
120 cx: &mut std::task::Context<'_>,
121 ) -> std::task::Poll<Self::Output> {
122 // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
123 // and we're just projecting the pin through to the inner task.
124 let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
125 inner.poll(cx)
126 }
127}
128
129impl BackgroundExecutor {
130 /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
131 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
132 #[cfg(any(test, feature = "test-support"))]
133 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
134 test_dispatcher.scheduler().clone()
135 } else {
136 Arc::new(PlatformScheduler::new(dispatcher.clone()))
137 };
138
139 #[cfg(not(any(test, feature = "test-support")))]
140 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
141
142 Self {
143 inner: scheduler::BackgroundExecutor::new(scheduler),
144 dispatcher,
145 }
146 }
147
148 /// Returns the underlying scheduler::BackgroundExecutor.
149 ///
150 /// This is used by Ex to pass the executor to thread/worktree code.
151 pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
152 self.inner.clone()
153 }
154
155 /// Enqueues the given future to be run to completion on a background thread.
156 #[track_caller]
157 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
158 where
159 R: Send + 'static,
160 {
161 self.spawn_with_priority(Priority::default(), future.boxed())
162 }
163
164 /// Enqueues the given future to be run to completion on a background thread with the given priority.
165 ///
166 /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
167 /// realtime scheduling priority, suitable for audio processing.
168 #[track_caller]
169 pub fn spawn_with_priority<R>(
170 &self,
171 priority: Priority,
172 future: impl Future<Output = R> + Send + 'static,
173 ) -> Task<R>
174 where
175 R: Send + 'static,
176 {
177 if priority == Priority::RealtimeAudio {
178 Task::from_scheduler(self.inner.spawn_realtime(future))
179 } else {
180 Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
181 }
182 }
183
184 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
185 ///
186 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
187 /// completion before the current task is resumed, even if the current task is slated for cancellation.
188 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
189 where
190 R: Send,
191 {
192 use crate::RunnableMeta;
193 use parking_lot::{Condvar, Mutex};
194
195 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
196
197 impl Drop for NotifyOnDrop<'_> {
198 fn drop(&mut self) {
199 *self.0.1.lock() = true;
200 self.0.0.notify_all();
201 }
202 }
203
204 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
205
206 impl Drop for WaitOnDrop<'_> {
207 fn drop(&mut self) {
208 let mut done = self.0.1.lock();
209 if !*done {
210 self.0.0.wait(&mut done);
211 }
212 }
213 }
214
215 let dispatcher = self.dispatcher.clone();
216 let location = core::panic::Location::caller();
217
218 let pair = &(Condvar::new(), Mutex::new(false));
219 let _wait_guard = WaitOnDrop(pair);
220
221 let (runnable, task) = unsafe {
222 async_task::Builder::new()
223 .metadata(RunnableMeta { location })
224 .spawn_unchecked(
225 move |_| async {
226 let _notify_guard = NotifyOnDrop(pair);
227 future.await
228 },
229 move |runnable| {
230 dispatcher.dispatch(runnable, Priority::default());
231 },
232 )
233 };
234 runnable.schedule();
235 task.await
236 }
237
238 /// Scoped lets you start a number of tasks and waits
239 /// for all of them to complete before returning.
240 pub async fn scoped<'scope, F>(&self, scheduler: F)
241 where
242 F: FnOnce(&mut Scope<'scope>),
243 {
244 let mut scope = Scope::new(self.clone(), Priority::default());
245 (scheduler)(&mut scope);
246 let spawned = mem::take(&mut scope.futures)
247 .into_iter()
248 .map(|f| self.spawn_with_priority(scope.priority, f))
249 .collect::<Vec<_>>();
250 for task in spawned {
251 task.await;
252 }
253 }
254
255 /// Scoped lets you start a number of tasks and waits
256 /// for all of them to complete before returning.
257 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
258 where
259 F: FnOnce(&mut Scope<'scope>),
260 {
261 let mut scope = Scope::new(self.clone(), priority);
262 (scheduler)(&mut scope);
263 let spawned = mem::take(&mut scope.futures)
264 .into_iter()
265 .map(|f| self.spawn_with_priority(scope.priority, f))
266 .collect::<Vec<_>>();
267 for task in spawned {
268 task.await;
269 }
270 }
271
272 /// Get the current time.
273 ///
274 /// Calling this instead of `std::time::Instant::now` allows the use
275 /// of fake timers in tests.
276 pub fn now(&self) -> Instant {
277 self.inner.scheduler().clock().now()
278 }
279
280 /// Returns a task that will complete after the given duration.
281 /// Depending on other concurrent tasks the elapsed duration may be longer
282 /// than requested.
283 #[track_caller]
284 pub fn timer(&self, duration: Duration) -> Task<()> {
285 if duration.is_zero() {
286 return Task::ready(());
287 }
288 self.spawn(self.inner.scheduler().timer(duration))
289 }
290
291 /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
292 #[cfg(any(test, feature = "test-support"))]
293 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
294 self.dispatcher.as_test().unwrap().simulate_random_delay()
295 }
296
297 /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
298 #[cfg(any(test, feature = "test-support"))]
299 pub fn advance_clock(&self, duration: Duration) {
300 self.dispatcher.as_test().unwrap().advance_clock(duration)
301 }
302
303 /// In tests, run one task.
304 #[cfg(any(test, feature = "test-support"))]
305 pub fn tick(&self) -> bool {
306 self.dispatcher.as_test().unwrap().scheduler().tick()
307 }
308
309 /// In tests, run tasks until the scheduler would park.
310 ///
311 /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
312 /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
313 /// drained. To preserve the historical semantics that tests relied on (drain all work that can
314 /// make progress), we advance the clock to the next timer when no runnable tasks remain.
315 #[cfg(any(test, feature = "test-support"))]
316 pub fn run_until_parked(&self) {
317 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
318 scheduler.run();
319 }
320
321 /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
322 #[cfg(any(test, feature = "test-support"))]
323 pub fn allow_parking(&self) {
324 self.dispatcher
325 .as_test()
326 .unwrap()
327 .scheduler()
328 .allow_parking();
329
330 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
331 log::warn!("[gpui::executor] allow_parking: enabled");
332 }
333 }
334
335 /// Sets the range of ticks to run before timing out in block_on.
336 #[cfg(any(test, feature = "test-support"))]
337 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
338 self.dispatcher
339 .as_test()
340 .unwrap()
341 .scheduler()
342 .set_timeout_ticks(range);
343 }
344
345 /// Undoes the effect of [`Self::allow_parking`].
346 #[cfg(any(test, feature = "test-support"))]
347 pub fn forbid_parking(&self) {
348 self.dispatcher
349 .as_test()
350 .unwrap()
351 .scheduler()
352 .forbid_parking();
353 }
354
355 /// In tests, returns the rng used by the dispatcher.
356 #[cfg(any(test, feature = "test-support"))]
357 pub fn rng(&self) -> scheduler::SharedRng {
358 self.dispatcher.as_test().unwrap().scheduler().rng()
359 }
360
361 /// How many CPUs are available to the dispatcher.
362 pub fn num_cpus(&self) -> usize {
363 #[cfg(any(test, feature = "test-support"))]
364 if let Some(test) = self.dispatcher.as_test() {
365 return test.num_cpus_override().unwrap_or(4);
366 }
367 num_cpus::get()
368 }
369
370 /// Override the number of CPUs reported by this executor in tests.
371 /// Panics if not called on a test executor.
372 #[cfg(any(test, feature = "test-support"))]
373 pub fn set_num_cpus(&self, count: usize) {
374 self.dispatcher
375 .as_test()
376 .expect("set_num_cpus can only be called on a test executor")
377 .set_num_cpus(count);
378 }
379
380 /// Whether we're on the main thread.
381 pub fn is_main_thread(&self) -> bool {
382 self.dispatcher.is_main_thread()
383 }
384
385 #[doc(hidden)]
386 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
387 &self.dispatcher
388 }
389}
390
391impl ForegroundExecutor {
392 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
393 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
394 #[cfg(any(test, feature = "test-support"))]
395 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
396 if let Some(test_dispatcher) = dispatcher.as_test() {
397 (
398 test_dispatcher.scheduler().clone(),
399 test_dispatcher.session_id(),
400 )
401 } else {
402 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
403 let session_id = platform_scheduler.allocate_session_id();
404 (platform_scheduler, session_id)
405 };
406
407 #[cfg(not(any(test, feature = "test-support")))]
408 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
409 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
410 let session_id = platform_scheduler.allocate_session_id();
411 (platform_scheduler, session_id)
412 };
413
414 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
415
416 Self {
417 inner,
418 dispatcher,
419 not_send: PhantomData,
420 }
421 }
422
423 /// Enqueues the given Task to run on the main thread.
424 #[track_caller]
425 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
426 where
427 R: 'static,
428 {
429 Task::from_scheduler(self.inner.spawn(future.boxed_local()))
430 }
431
432 /// Enqueues the given Task to run on the main thread with the given priority.
433 #[track_caller]
434 pub fn spawn_with_priority<R>(
435 &self,
436 _priority: Priority,
437 future: impl Future<Output = R> + 'static,
438 ) -> Task<R>
439 where
440 R: 'static,
441 {
442 // Priority is ignored for foreground tasks - they run in order on the main thread
443 Task::from_scheduler(self.inner.spawn(future))
444 }
445
446 /// Used by the test harness to run an async test in a synchronous fashion.
447 #[cfg(any(test, feature = "test-support"))]
448 #[track_caller]
449 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
450 use std::cell::Cell;
451
452 let scheduler = self.inner.scheduler();
453
454 let output = Cell::new(None);
455 let future = async {
456 output.set(Some(future.await));
457 };
458 let mut future = std::pin::pin!(future);
459
460 // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
461 // (which are associated with the test session) to make progress while we block.
462 // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
463 scheduler.block(None, future.as_mut(), None);
464
465 output.take().expect("block_test future did not complete")
466 }
467
468 /// Block the current thread until the given future resolves.
469 /// Consider using `block_with_timeout` instead.
470 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
471 self.inner.block_on(future)
472 }
473
474 /// Block the current thread until the given future resolves or the timeout elapses.
475 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
476 &self,
477 duration: Duration,
478 future: Fut,
479 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
480 self.inner.block_with_timeout(duration, future)
481 }
482
483 #[doc(hidden)]
484 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
485 &self.dispatcher
486 }
487
488 #[doc(hidden)]
489 pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
490 self.inner.clone()
491 }
492}
493
494/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
495pub struct Scope<'a> {
496 executor: BackgroundExecutor,
497 priority: Priority,
498 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
499 tx: Option<mpsc::Sender<()>>,
500 rx: mpsc::Receiver<()>,
501 lifetime: PhantomData<&'a ()>,
502}
503
504impl<'a> Scope<'a> {
505 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
506 let (tx, rx) = mpsc::channel(1);
507 Self {
508 executor,
509 priority,
510 tx: Some(tx),
511 rx,
512 futures: Default::default(),
513 lifetime: PhantomData,
514 }
515 }
516
517 /// How many CPUs are available to the dispatcher.
518 pub fn num_cpus(&self) -> usize {
519 self.executor.num_cpus()
520 }
521
522 /// Spawn a future into this scope.
523 #[track_caller]
524 pub fn spawn<F>(&mut self, f: F)
525 where
526 F: Future<Output = ()> + Send + 'a,
527 {
528 let tx = self.tx.clone().unwrap();
529
530 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
531 // dropping this `Scope` blocks until all of the futures have resolved.
532 let f = unsafe {
533 mem::transmute::<
534 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
535 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
536 >(Box::pin(async move {
537 f.await;
538 drop(tx);
539 }))
540 };
541 self.futures.push(f);
542 }
543}
544
545impl Drop for Scope<'_> {
546 fn drop(&mut self) {
547 self.tx.take().unwrap();
548
549 // Wait until the channel is closed, which means that all of the spawned
550 // futures have resolved.
551 let future = async {
552 self.rx.next().await;
553 };
554 let mut future = std::pin::pin!(future);
555 self.executor
556 .inner
557 .scheduler()
558 .block(None, future.as_mut(), None);
559 }
560}
561
562#[cfg(test)]
563mod test {
564 use super::*;
565 use crate::{App, TestDispatcher, TestPlatform};
566 use std::cell::RefCell;
567
568 /// Helper to create test infrastructure.
569 /// Returns (dispatcher, background_executor, app).
570 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
571 let dispatcher = TestDispatcher::new(0);
572 let arc_dispatcher = Arc::new(dispatcher.clone());
573 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
574 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
575
576 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
577 let asset_source = Arc::new(());
578 let http_client = http_client::FakeHttpClient::with_404_response();
579
580 let app = App::new_app(platform, asset_source, http_client);
581 (dispatcher, background_executor, app)
582 }
583
584 #[test]
585 fn sanity_test_tasks_run() {
586 let (dispatcher, _background_executor, app) = create_test_app();
587 let foreground_executor = app.borrow().foreground_executor.clone();
588
589 let task_ran = Rc::new(RefCell::new(false));
590
591 foreground_executor
592 .spawn({
593 let task_ran = Rc::clone(&task_ran);
594 async move {
595 *task_ran.borrow_mut() = true;
596 }
597 })
598 .detach();
599
600 // Run dispatcher while app is still alive
601 dispatcher.run_until_parked();
602
603 // Task should have run
604 assert!(
605 *task_ran.borrow(),
606 "Task should run normally when app is alive"
607 );
608 }
609}