1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::TryFutureExt;
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{
8 fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
9 time::Duration,
10};
11
12pub use scheduler::{
13 FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority, Task,
14};
15
16/// A pointer to the executor that is currently running,
17/// for spawning background tasks.
18#[derive(Clone)]
19pub struct BackgroundExecutor {
20 inner: scheduler::BackgroundExecutor,
21 dispatcher: Arc<dyn PlatformDispatcher>,
22}
23
24/// A pointer to the executor that is currently running,
25/// for spawning tasks on the main thread.
26#[derive(Clone)]
27pub struct ForegroundExecutor {
28 inner: scheduler::ForegroundExecutor,
29 dispatcher: Arc<dyn PlatformDispatcher>,
30 not_send: PhantomData<Rc<()>>,
31}
32
33/// Extension trait for `Task<Result<T, E>>` that adds `detach_and_log_err` with an `&App` context.
34///
35/// This trait is automatically implemented for all `Task<Result<T, E>>` types.
36pub trait TaskExt<T, E> {
37 /// Run the task to completion in the background and log any errors that occur.
38 fn detach_and_log_err(self, cx: &App);
39}
40
41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
42where
43 T: 'static,
44 E: 'static + Debug,
45{
46 #[track_caller]
47 fn detach_and_log_err(self, cx: &App) {
48 let location = core::panic::Location::caller();
49 cx.foreground_executor()
50 .spawn(self.log_tracked_err(*location))
51 .detach();
52 }
53}
54
55impl BackgroundExecutor {
56 /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
57 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
58 #[cfg(any(test, feature = "test-support"))]
59 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
60 test_dispatcher.scheduler().clone()
61 } else {
62 Arc::new(PlatformScheduler::new(dispatcher.clone()))
63 };
64
65 #[cfg(not(any(test, feature = "test-support")))]
66 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
67
68 Self {
69 inner: scheduler::BackgroundExecutor::new(scheduler),
70 dispatcher,
71 }
72 }
73
74 /// Returns the underlying scheduler::BackgroundExecutor.
75 ///
76 /// This is used by Ex to pass the executor to thread/worktree code.
77 pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
78 self.inner.clone()
79 }
80
81 /// Enqueues the given future to be run to completion on a background thread.
82 #[track_caller]
83 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
84 where
85 R: Send + 'static,
86 {
87 self.spawn_with_priority(Priority::default(), future.boxed())
88 }
89
90 /// Enqueues the given future to be run to completion on a background thread with the given priority.
91 ///
92 /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
93 /// realtime scheduling priority, suitable for audio processing.
94 #[track_caller]
95 pub fn spawn_with_priority<R>(
96 &self,
97 priority: Priority,
98 future: impl Future<Output = R> + Send + 'static,
99 ) -> Task<R>
100 where
101 R: Send + 'static,
102 {
103 if priority == Priority::RealtimeAudio {
104 self.inner.spawn_realtime(future)
105 } else {
106 self.inner.spawn_with_priority(priority, future)
107 }
108 }
109
110 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
111 ///
112 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
113 /// completion before the current task is resumed, even if the current task is slated for cancellation.
114 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
115 where
116 R: Send,
117 {
118 use crate::RunnableMeta;
119 use parking_lot::{Condvar, Mutex};
120
121 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
122
123 impl Drop for NotifyOnDrop<'_> {
124 fn drop(&mut self) {
125 *self.0.1.lock() = true;
126 self.0.0.notify_all();
127 }
128 }
129
130 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
131
132 impl Drop for WaitOnDrop<'_> {
133 fn drop(&mut self) {
134 let mut done = self.0.1.lock();
135 if !*done {
136 self.0.0.wait(&mut done);
137 }
138 }
139 }
140
141 let dispatcher = self.dispatcher.clone();
142 let location = core::panic::Location::caller();
143
144 let pair = &(Condvar::new(), Mutex::new(false));
145 let _wait_guard = WaitOnDrop(pair);
146
147 let (runnable, task) = unsafe {
148 async_task::Builder::new()
149 .metadata(RunnableMeta { location })
150 .spawn_unchecked(
151 move |_| async {
152 let _notify_guard = NotifyOnDrop(pair);
153 future.await
154 },
155 move |runnable| {
156 dispatcher.dispatch(runnable, Priority::default());
157 },
158 )
159 };
160 runnable.schedule();
161 task.await
162 }
163
164 /// Scoped lets you start a number of tasks and waits
165 /// for all of them to complete before returning.
166 pub async fn scoped<'scope, F>(&self, scheduler: F)
167 where
168 F: FnOnce(&mut Scope<'scope>),
169 {
170 let mut scope = Scope::new(self.clone(), Priority::default());
171 (scheduler)(&mut scope);
172 let spawned = mem::take(&mut scope.futures)
173 .into_iter()
174 .map(|f| self.spawn_with_priority(scope.priority, f))
175 .collect::<Vec<_>>();
176 for task in spawned {
177 task.await;
178 }
179 }
180
181 /// Scoped lets you start a number of tasks and waits
182 /// for all of them to complete before returning.
183 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
184 where
185 F: FnOnce(&mut Scope<'scope>),
186 {
187 let mut scope = Scope::new(self.clone(), priority);
188 (scheduler)(&mut scope);
189 let spawned = mem::take(&mut scope.futures)
190 .into_iter()
191 .map(|f| self.spawn_with_priority(scope.priority, f))
192 .collect::<Vec<_>>();
193 for task in spawned {
194 task.await;
195 }
196 }
197
198 /// Get the current time.
199 ///
200 /// Calling this instead of `std::time::Instant::now` allows the use
201 /// of fake timers in tests.
202 pub fn now(&self) -> Instant {
203 self.inner.scheduler().clock().now()
204 }
205
206 /// Returns a task that will complete after the given duration.
207 /// Depending on other concurrent tasks the elapsed duration may be longer
208 /// than requested.
209 #[track_caller]
210 pub fn timer(&self, duration: Duration) -> Task<()> {
211 if duration.is_zero() {
212 return Task::ready(());
213 }
214 self.spawn(self.inner.scheduler().timer(duration))
215 }
216
217 /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
218 #[cfg(any(test, feature = "test-support"))]
219 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
220 self.dispatcher.as_test().unwrap().simulate_random_delay()
221 }
222
223 /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
224 #[cfg(any(test, feature = "test-support"))]
225 pub fn advance_clock(&self, duration: Duration) {
226 self.dispatcher.as_test().unwrap().advance_clock(duration)
227 }
228
229 /// In tests, run one task.
230 #[cfg(any(test, feature = "test-support"))]
231 pub fn tick(&self) -> bool {
232 self.dispatcher.as_test().unwrap().scheduler().tick()
233 }
234
235 /// In tests, run tasks until the scheduler would park.
236 ///
237 /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
238 /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
239 /// drained. To preserve the historical semantics that tests relied on (drain all work that can
240 /// make progress), we advance the clock to the next timer when no runnable tasks remain.
241 #[cfg(any(test, feature = "test-support"))]
242 pub fn run_until_parked(&self) {
243 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
244 scheduler.run();
245 }
246
247 /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
248 #[cfg(any(test, feature = "test-support"))]
249 pub fn allow_parking(&self) {
250 self.dispatcher
251 .as_test()
252 .unwrap()
253 .scheduler()
254 .allow_parking();
255
256 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
257 log::warn!("[gpui::executor] allow_parking: enabled");
258 }
259 }
260
261 /// Sets the range of ticks to run before timing out in block_on.
262 #[cfg(any(test, feature = "test-support"))]
263 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
264 self.dispatcher
265 .as_test()
266 .unwrap()
267 .scheduler()
268 .set_timeout_ticks(range);
269 }
270
271 /// Undoes the effect of [`Self::allow_parking`].
272 #[cfg(any(test, feature = "test-support"))]
273 pub fn forbid_parking(&self) {
274 self.dispatcher
275 .as_test()
276 .unwrap()
277 .scheduler()
278 .forbid_parking();
279 }
280
281 /// In tests, returns the rng used by the dispatcher.
282 #[cfg(any(test, feature = "test-support"))]
283 pub fn rng(&self) -> scheduler::SharedRng {
284 self.dispatcher.as_test().unwrap().scheduler().rng()
285 }
286
287 /// How many CPUs are available to the dispatcher.
288 pub fn num_cpus(&self) -> usize {
289 #[cfg(any(test, feature = "test-support"))]
290 if let Some(test) = self.dispatcher.as_test() {
291 return test.num_cpus_override().unwrap_or(4);
292 }
293 num_cpus::get()
294 }
295
296 /// Override the number of CPUs reported by this executor in tests.
297 /// Panics if not called on a test executor.
298 #[cfg(any(test, feature = "test-support"))]
299 pub fn set_num_cpus(&self, count: usize) {
300 self.dispatcher
301 .as_test()
302 .expect("set_num_cpus can only be called on a test executor")
303 .set_num_cpus(count);
304 }
305
306 /// Whether we're on the main thread.
307 pub fn is_main_thread(&self) -> bool {
308 self.dispatcher.is_main_thread()
309 }
310
311 #[doc(hidden)]
312 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
313 &self.dispatcher
314 }
315}
316
317impl ForegroundExecutor {
318 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
319 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
320 #[cfg(any(test, feature = "test-support"))]
321 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
322 if let Some(test_dispatcher) = dispatcher.as_test() {
323 (
324 test_dispatcher.scheduler().clone(),
325 test_dispatcher.session_id(),
326 )
327 } else {
328 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
329 let session_id = platform_scheduler.allocate_session_id();
330 (platform_scheduler, session_id)
331 };
332
333 #[cfg(not(any(test, feature = "test-support")))]
334 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
335 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
336 let session_id = platform_scheduler.allocate_session_id();
337 (platform_scheduler, session_id)
338 };
339
340 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
341
342 Self {
343 inner,
344 dispatcher,
345 not_send: PhantomData,
346 }
347 }
348
349 /// Enqueues the given Task to run on the main thread.
350 #[track_caller]
351 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
352 where
353 R: 'static,
354 {
355 self.inner.spawn(future.boxed_local())
356 }
357
358 /// Enqueues the given Task to run on the main thread with the given priority.
359 #[track_caller]
360 pub fn spawn_with_priority<R>(
361 &self,
362 _priority: Priority,
363 future: impl Future<Output = R> + 'static,
364 ) -> Task<R>
365 where
366 R: 'static,
367 {
368 // Priority is ignored for foreground tasks - they run in order on the main thread
369 self.inner.spawn(future)
370 }
371
372 /// Used by the test harness to run an async test in a synchronous fashion.
373 #[cfg(any(test, feature = "test-support"))]
374 #[track_caller]
375 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
376 use std::cell::Cell;
377
378 let scheduler = self.inner.scheduler();
379
380 let output = Cell::new(None);
381 let future = async {
382 output.set(Some(future.await));
383 };
384 let mut future = std::pin::pin!(future);
385
386 // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
387 // (which are associated with the test session) to make progress while we block.
388 // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
389 scheduler.block(None, future.as_mut(), None);
390
391 output.take().expect("block_test future did not complete")
392 }
393
394 /// Block the current thread until the given future resolves.
395 /// Consider using `block_with_timeout` instead.
396 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
397 self.inner.block_on(future)
398 }
399
400 /// Block the current thread until the given future resolves or the timeout elapses.
401 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
402 &self,
403 duration: Duration,
404 future: Fut,
405 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
406 self.inner.block_with_timeout(duration, future)
407 }
408
409 #[doc(hidden)]
410 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
411 &self.dispatcher
412 }
413
414 #[doc(hidden)]
415 pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
416 self.inner.clone()
417 }
418}
419
420/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
421pub struct Scope<'a> {
422 executor: BackgroundExecutor,
423 priority: Priority,
424 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
425 tx: Option<mpsc::Sender<()>>,
426 rx: mpsc::Receiver<()>,
427 lifetime: PhantomData<&'a ()>,
428}
429
430impl<'a> Scope<'a> {
431 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
432 let (tx, rx) = mpsc::channel(1);
433 Self {
434 executor,
435 priority,
436 tx: Some(tx),
437 rx,
438 futures: Default::default(),
439 lifetime: PhantomData,
440 }
441 }
442
443 /// How many CPUs are available to the dispatcher.
444 pub fn num_cpus(&self) -> usize {
445 self.executor.num_cpus()
446 }
447
448 /// Spawn a future into this scope.
449 #[track_caller]
450 pub fn spawn<F>(&mut self, f: F)
451 where
452 F: Future<Output = ()> + Send + 'a,
453 {
454 let tx = self.tx.clone().unwrap();
455
456 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
457 // dropping this `Scope` blocks until all of the futures have resolved.
458 let f = unsafe {
459 mem::transmute::<
460 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
461 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
462 >(Box::pin(async move {
463 f.await;
464 drop(tx);
465 }))
466 };
467 self.futures.push(f);
468 }
469}
470
471impl Drop for Scope<'_> {
472 fn drop(&mut self) {
473 self.tx.take().unwrap();
474
475 // Wait until the channel is closed, which means that all of the spawned
476 // futures have resolved.
477 let future = async {
478 self.rx.next().await;
479 };
480 let mut future = std::pin::pin!(future);
481 self.executor
482 .inner
483 .scheduler()
484 .block(None, future.as_mut(), None);
485 }
486}
487
488#[cfg(test)]
489mod test {
490 use super::*;
491 use crate::{App, TestDispatcher, TestPlatform};
492 use std::cell::RefCell;
493
494 /// Helper to create test infrastructure.
495 /// Returns (dispatcher, background_executor, app).
496 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
497 let dispatcher = TestDispatcher::new(0);
498 let arc_dispatcher = Arc::new(dispatcher.clone());
499 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
500 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
501
502 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
503 let asset_source = Arc::new(());
504 let http_client = http_client::FakeHttpClient::with_404_response();
505
506 let app = App::new_app(platform, asset_source, http_client);
507 (dispatcher, background_executor, app)
508 }
509
510 #[test]
511 fn sanity_test_tasks_run() {
512 let (dispatcher, _background_executor, app) = create_test_app();
513 let foreground_executor = app.borrow().foreground_executor.clone();
514
515 let task_ran = Rc::new(RefCell::new(false));
516
517 foreground_executor
518 .spawn({
519 let task_ran = Rc::clone(&task_ran);
520 async move {
521 *task_ran.borrow_mut() = true;
522 }
523 })
524 .detach();
525
526 // Run dispatcher while app is still alive
527 dispatcher.run_until_parked();
528
529 // Task should have run
530 assert!(
531 *task_ran.borrow(),
532 "Task should run normally when app is alive"
533 );
534 }
535}