1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use smol::{channel, prelude::*, Executor, Timer};
4use std::{
5 any::Any,
6 fmt::{self, Display},
7 marker::PhantomData,
8 mem,
9 pin::Pin,
10 rc::Rc,
11 sync::Arc,
12 task::{Context, Poll},
13 thread,
14 time::Duration,
15};
16
17use crate::{
18 platform::{self, Dispatcher},
19 util, MutableAppContext,
20};
21
22pub enum Foreground {
23 Platform {
24 dispatcher: Arc<dyn platform::Dispatcher>,
25 _not_send_or_sync: PhantomData<Rc<()>>,
26 },
27 #[cfg(any(test, feature = "test-support"))]
28 Deterministic {
29 cx_id: usize,
30 executor: Arc<Deterministic>,
31 },
32}
33
34pub enum Background {
35 #[cfg(any(test, feature = "test-support"))]
36 Deterministic { executor: Arc<Deterministic> },
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
44type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
45type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
46type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
47
48#[must_use]
49pub enum Task<T> {
50 Ready(Option<T>),
51 Local {
52 any_task: AnyLocalTask,
53 result_type: PhantomData<T>,
54 },
55 Send {
56 any_task: AnyTask,
57 result_type: PhantomData<T>,
58 },
59}
60
61unsafe impl<T: Send> Send for Task<T> {}
62
63#[cfg(any(test, feature = "test-support"))]
64struct DeterministicState {
65 rng: rand::prelude::StdRng,
66 seed: u64,
67 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
68 scheduled_from_background: Vec<Runnable>,
69 forbid_parking: bool,
70 block_on_ticks: std::ops::RangeInclusive<usize>,
71 now: std::time::Instant,
72 pending_timers: Vec<(std::time::Instant, postage::barrier::Sender)>,
73 waiting_backtrace: Option<backtrace::Backtrace>,
74}
75
76#[cfg(any(test, feature = "test-support"))]
77struct ForegroundRunnable {
78 runnable: Runnable,
79 main: bool,
80}
81
82#[cfg(any(test, feature = "test-support"))]
83pub struct Deterministic {
84 state: Arc<parking_lot::Mutex<DeterministicState>>,
85 parker: parking_lot::Mutex<parking::Parker>,
86}
87
88#[cfg(any(test, feature = "test-support"))]
89impl Deterministic {
90 pub fn new(seed: u64) -> Arc<Self> {
91 use rand::prelude::*;
92
93 Arc::new(Self {
94 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
95 rng: StdRng::seed_from_u64(seed),
96 seed,
97 scheduled_from_foreground: Default::default(),
98 scheduled_from_background: Default::default(),
99 forbid_parking: false,
100 block_on_ticks: 0..=1000,
101 now: std::time::Instant::now(),
102 pending_timers: Default::default(),
103 waiting_backtrace: None,
104 })),
105 parker: Default::default(),
106 })
107 }
108
109 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
110 Arc::new(Background::Deterministic {
111 executor: self.clone(),
112 })
113 }
114
115 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
116 Rc::new(Foreground::Deterministic {
117 cx_id: id,
118 executor: self.clone(),
119 })
120 }
121
122 fn spawn_from_foreground(
123 &self,
124 cx_id: usize,
125 future: AnyLocalFuture,
126 main: bool,
127 ) -> AnyLocalTask {
128 let state = self.state.clone();
129 let unparker = self.parker.lock().unparker();
130 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
131 let mut state = state.lock();
132 state
133 .scheduled_from_foreground
134 .entry(cx_id)
135 .or_default()
136 .push(ForegroundRunnable { runnable, main });
137 unparker.unpark();
138 });
139 runnable.schedule();
140 task
141 }
142
143 fn spawn(&self, future: AnyFuture) -> AnyTask {
144 let state = self.state.clone();
145 let unparker = self.parker.lock().unparker();
146 let (runnable, task) = async_task::spawn(future, move |runnable| {
147 let mut state = state.lock();
148 state.scheduled_from_background.push(runnable);
149 unparker.unpark();
150 });
151 runnable.schedule();
152 task
153 }
154
155 fn run<'a>(
156 &self,
157 cx_id: usize,
158 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
159 ) -> Box<dyn Any> {
160 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
161
162 let woken = Arc::new(AtomicBool::new(false));
163
164 let state = self.state.clone();
165 let unparker = self.parker.lock().unparker();
166 let (runnable, mut main_task) = unsafe {
167 async_task::spawn_unchecked(main_future, move |runnable| {
168 let mut state = state.lock();
169 state
170 .scheduled_from_foreground
171 .entry(cx_id)
172 .or_default()
173 .push(ForegroundRunnable {
174 runnable,
175 main: true,
176 });
177 unparker.unpark();
178 })
179 };
180 runnable.schedule();
181
182 loop {
183 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
184 return result;
185 }
186
187 if !woken.load(SeqCst) {
188 self.state.lock().will_park();
189 }
190
191 woken.store(false, SeqCst);
192 self.parker.lock().park();
193 }
194 }
195
196 pub fn run_until_parked(&self) {
197 use std::sync::atomic::AtomicBool;
198 let woken = Arc::new(AtomicBool::new(false));
199 self.run_internal(woken, None);
200 }
201
202 fn run_internal(
203 &self,
204 woken: Arc<std::sync::atomic::AtomicBool>,
205 mut main_task: Option<&mut AnyLocalTask>,
206 ) -> Option<Box<dyn Any>> {
207 use rand::prelude::*;
208 use std::sync::atomic::Ordering::SeqCst;
209
210 let unparker = self.parker.lock().unparker();
211 let waker = waker_fn::waker_fn(move || {
212 woken.store(true, SeqCst);
213 unparker.unpark();
214 });
215
216 let mut cx = Context::from_waker(&waker);
217 loop {
218 let mut state = self.state.lock();
219
220 if state.scheduled_from_foreground.is_empty()
221 && state.scheduled_from_background.is_empty()
222 {
223 return None;
224 }
225
226 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
227 let background_len = state.scheduled_from_background.len();
228 let ix = state.rng.gen_range(0..background_len);
229 let runnable = state.scheduled_from_background.remove(ix);
230 drop(state);
231 runnable.run();
232 } else if !state.scheduled_from_foreground.is_empty() {
233 let available_cx_ids = state
234 .scheduled_from_foreground
235 .keys()
236 .copied()
237 .collect::<Vec<_>>();
238 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
239 let scheduled_from_cx = state
240 .scheduled_from_foreground
241 .get_mut(&cx_id_to_run)
242 .unwrap();
243 let foreground_runnable = scheduled_from_cx.remove(0);
244 if scheduled_from_cx.is_empty() {
245 state.scheduled_from_foreground.remove(&cx_id_to_run);
246 }
247
248 drop(state);
249
250 foreground_runnable.runnable.run();
251 if let Some(main_task) = main_task.as_mut() {
252 if foreground_runnable.main {
253 if let Poll::Ready(result) = main_task.poll(&mut cx) {
254 return Some(result);
255 }
256 }
257 }
258 }
259 }
260 }
261
262 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
263 where
264 F: Unpin + Future<Output = T>,
265 {
266 use rand::prelude::*;
267
268 let unparker = self.parker.lock().unparker();
269 let waker = waker_fn::waker_fn(move || {
270 unparker.unpark();
271 });
272
273 let mut cx = Context::from_waker(&waker);
274 for _ in 0..max_ticks {
275 let mut state = self.state.lock();
276 let runnable_count = state.scheduled_from_background.len();
277 let ix = state.rng.gen_range(0..=runnable_count);
278 if ix < state.scheduled_from_background.len() {
279 let runnable = state.scheduled_from_background.remove(ix);
280 drop(state);
281 runnable.run();
282 } else {
283 drop(state);
284 if let Poll::Ready(result) = future.poll(&mut cx) {
285 return Some(result);
286 }
287 let mut state = self.state.lock();
288 if state.scheduled_from_background.is_empty() {
289 state.will_park();
290 drop(state);
291 self.parker.lock().park();
292 }
293
294 continue;
295 }
296 }
297
298 None
299 }
300}
301
302#[cfg(any(test, feature = "test-support"))]
303impl DeterministicState {
304 fn will_park(&mut self) {
305 if self.forbid_parking {
306 let mut backtrace_message = String::new();
307 #[cfg(any(test, feature = "test-support"))]
308 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
309 backtrace.resolve();
310 backtrace_message = format!(
311 "\nbacktrace of waiting future:\n{:?}",
312 util::CwdBacktrace(backtrace)
313 );
314 }
315
316 panic!(
317 "deterministic executor parked after a call to forbid_parking{}",
318 backtrace_message
319 );
320 }
321 }
322}
323
324impl Foreground {
325 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
326 if dispatcher.is_main_thread() {
327 Ok(Self::Platform {
328 dispatcher,
329 _not_send_or_sync: PhantomData,
330 })
331 } else {
332 Err(anyhow!("must be constructed on main thread"))
333 }
334 }
335
336 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
337 let future = any_local_future(future);
338 let any_task = match self {
339 #[cfg(any(test, feature = "test-support"))]
340 Self::Deterministic { cx_id, executor } => {
341 executor.spawn_from_foreground(*cx_id, future, false)
342 }
343 Self::Platform { dispatcher, .. } => {
344 fn spawn_inner(
345 future: AnyLocalFuture,
346 dispatcher: &Arc<dyn Dispatcher>,
347 ) -> AnyLocalTask {
348 let dispatcher = dispatcher.clone();
349 let schedule =
350 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
351 let (runnable, task) = async_task::spawn_local(future, schedule);
352 runnable.schedule();
353 task
354 }
355 spawn_inner(future, dispatcher)
356 }
357 };
358 Task::local(any_task)
359 }
360
361 #[cfg(any(test, feature = "test-support"))]
362 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
363 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
364 let result = match self {
365 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
366 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
367 };
368 *result.downcast().unwrap()
369 }
370
371 #[cfg(any(test, feature = "test-support"))]
372 pub fn run_until_parked(&self) {
373 match self {
374 Self::Deterministic { executor, .. } => executor.run_until_parked(),
375 _ => panic!("this method can only be called on a deterministic executor"),
376 }
377 }
378
379 #[cfg(any(test, feature = "test-support"))]
380 pub fn parking_forbidden(&self) -> bool {
381 match self {
382 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
383 _ => panic!("this method can only be called on a deterministic executor"),
384 }
385 }
386
387 #[cfg(any(test, feature = "test-support"))]
388 pub fn start_waiting(&self) {
389 match self {
390 Self::Deterministic { executor, .. } => {
391 executor.state.lock().waiting_backtrace =
392 Some(backtrace::Backtrace::new_unresolved());
393 }
394 _ => panic!("this method can only be called on a deterministic executor"),
395 }
396 }
397
398 #[cfg(any(test, feature = "test-support"))]
399 pub fn finish_waiting(&self) {
400 match self {
401 Self::Deterministic { executor, .. } => {
402 executor.state.lock().waiting_backtrace.take();
403 }
404 _ => panic!("this method can only be called on a deterministic executor"),
405 }
406 }
407
408 #[cfg(any(test, feature = "test-support"))]
409 pub fn forbid_parking(&self) {
410 use rand::prelude::*;
411
412 match self {
413 Self::Deterministic { executor, .. } => {
414 let mut state = executor.state.lock();
415 state.forbid_parking = true;
416 state.rng = StdRng::seed_from_u64(state.seed);
417 }
418 _ => panic!("this method can only be called on a deterministic executor"),
419 }
420 }
421
422 pub async fn timer(&self, duration: Duration) {
423 match self {
424 #[cfg(any(test, feature = "test-support"))]
425 Self::Deterministic { executor, .. } => {
426 use postage::prelude::Stream as _;
427
428 let (tx, mut rx) = postage::barrier::channel();
429 {
430 let mut state = executor.state.lock();
431 let wakeup_at = state.now + duration;
432 state.pending_timers.push((wakeup_at, tx));
433 }
434 rx.recv().await;
435 }
436 _ => {
437 Timer::after(duration).await;
438 }
439 }
440 }
441
442 #[cfg(any(test, feature = "test-support"))]
443 pub fn advance_clock(&self, duration: Duration) {
444 match self {
445 Self::Deterministic { executor, .. } => {
446 executor.run_until_parked();
447
448 let mut state = executor.state.lock();
449 state.now += duration;
450 let now = state.now;
451 let mut pending_timers = mem::take(&mut state.pending_timers);
452 drop(state);
453
454 pending_timers.retain(|(wakeup, _)| *wakeup > now);
455 executor.state.lock().pending_timers.extend(pending_timers);
456 }
457 _ => panic!("this method can only be called on a deterministic executor"),
458 }
459 }
460
461 #[cfg(any(test, feature = "test-support"))]
462 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
463 match self {
464 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
465 _ => panic!("this method can only be called on a deterministic executor"),
466 }
467 }
468}
469
470impl Background {
471 pub fn new() -> Self {
472 let executor = Arc::new(Executor::new());
473 let stop = channel::unbounded::<()>();
474
475 for i in 0..2 * num_cpus::get() {
476 let executor = executor.clone();
477 let stop = stop.1.clone();
478 thread::Builder::new()
479 .name(format!("background-executor-{}", i))
480 .spawn(move || smol::block_on(executor.run(stop.recv())))
481 .unwrap();
482 }
483
484 Self::Production {
485 executor,
486 _stop: stop.0,
487 }
488 }
489
490 pub fn num_cpus(&self) -> usize {
491 num_cpus::get()
492 }
493
494 pub fn spawn<T, F>(&self, future: F) -> Task<T>
495 where
496 T: 'static + Send,
497 F: Send + Future<Output = T> + 'static,
498 {
499 let future = any_future(future);
500 let any_task = match self {
501 Self::Production { executor, .. } => executor.spawn(future),
502 #[cfg(any(test, feature = "test-support"))]
503 Self::Deterministic { executor } => executor.spawn(future),
504 };
505 Task::send(any_task)
506 }
507
508 pub fn block<F, T>(&self, future: F) -> T
509 where
510 F: Future<Output = T>,
511 {
512 smol::pin!(future);
513 match self {
514 Self::Production { .. } => smol::block_on(&mut future),
515 #[cfg(any(test, feature = "test-support"))]
516 Self::Deterministic { executor, .. } => {
517 executor.block(&mut future, usize::MAX).unwrap()
518 }
519 }
520 }
521
522 pub fn block_with_timeout<F, T>(
523 &self,
524 timeout: Duration,
525 future: F,
526 ) -> Result<T, impl Future<Output = T>>
527 where
528 T: 'static,
529 F: 'static + Unpin + Future<Output = T>,
530 {
531 let mut future = any_local_future(future);
532 if !timeout.is_zero() {
533 let output = match self {
534 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
535 #[cfg(any(test, feature = "test-support"))]
536 Self::Deterministic { executor, .. } => {
537 use rand::prelude::*;
538 let max_ticks = {
539 let mut state = executor.state.lock();
540 let range = state.block_on_ticks.clone();
541 state.rng.gen_range(range)
542 };
543 executor.block(&mut future, max_ticks)
544 }
545 };
546 if let Some(output) = output {
547 return Ok(*output.downcast().unwrap());
548 }
549 }
550 Err(async { *future.await.downcast().unwrap() })
551 }
552
553 pub async fn scoped<'scope, F>(&self, scheduler: F)
554 where
555 F: FnOnce(&mut Scope<'scope>),
556 {
557 let mut scope = Scope {
558 futures: Default::default(),
559 _phantom: PhantomData,
560 };
561 (scheduler)(&mut scope);
562 let spawned = scope
563 .futures
564 .into_iter()
565 .map(|f| self.spawn(f))
566 .collect::<Vec<_>>();
567 for task in spawned {
568 task.await;
569 }
570 }
571
572 #[cfg(any(test, feature = "test-support"))]
573 pub async fn simulate_random_delay(&self) {
574 use rand::prelude::*;
575 use smol::future::yield_now;
576
577 match self {
578 Self::Deterministic { executor, .. } => {
579 if executor.state.lock().rng.gen_bool(0.2) {
580 let yields = executor.state.lock().rng.gen_range(1..=10);
581 for _ in 0..yields {
582 yield_now().await;
583 }
584 }
585 }
586 _ => panic!("this method can only be called on a deterministic executor"),
587 }
588 }
589}
590
591pub struct Scope<'a> {
592 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
593 _phantom: PhantomData<&'a ()>,
594}
595
596impl<'a> Scope<'a> {
597 pub fn spawn<F>(&mut self, f: F)
598 where
599 F: Future<Output = ()> + Send + 'a,
600 {
601 let f = unsafe {
602 mem::transmute::<
603 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
604 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
605 >(Box::pin(f))
606 };
607 self.futures.push(f);
608 }
609}
610
611impl<T> Task<T> {
612 pub fn ready(value: T) -> Self {
613 Self::Ready(Some(value))
614 }
615
616 fn local(any_task: AnyLocalTask) -> Self {
617 Self::Local {
618 any_task,
619 result_type: PhantomData,
620 }
621 }
622
623 pub fn detach(self) {
624 match self {
625 Task::Ready(_) => {}
626 Task::Local { any_task, .. } => any_task.detach(),
627 Task::Send { any_task, .. } => any_task.detach(),
628 }
629 }
630}
631
632impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
633 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
634 cx.spawn(|_| async move {
635 if let Err(err) = self.await {
636 log::error!("{}", err);
637 }
638 })
639 .detach();
640 }
641}
642
643impl<T: Send> Task<T> {
644 fn send(any_task: AnyTask) -> Self {
645 Self::Send {
646 any_task,
647 result_type: PhantomData,
648 }
649 }
650}
651
652impl<T: fmt::Debug> fmt::Debug for Task<T> {
653 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
654 match self {
655 Task::Ready(value) => value.fmt(f),
656 Task::Local { any_task, .. } => any_task.fmt(f),
657 Task::Send { any_task, .. } => any_task.fmt(f),
658 }
659 }
660}
661
662impl<T: 'static> Future for Task<T> {
663 type Output = T;
664
665 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
666 match unsafe { self.get_unchecked_mut() } {
667 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
668 Task::Local { any_task, .. } => {
669 any_task.poll(cx).map(|value| *value.downcast().unwrap())
670 }
671 Task::Send { any_task, .. } => {
672 any_task.poll(cx).map(|value| *value.downcast().unwrap())
673 }
674 }
675 }
676}
677
678fn any_future<T, F>(future: F) -> AnyFuture
679where
680 T: 'static + Send,
681 F: Future<Output = T> + Send + 'static,
682{
683 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
684}
685
686fn any_local_future<T, F>(future: F) -> AnyLocalFuture
687where
688 T: 'static,
689 F: Future<Output = T> + 'static,
690{
691 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
692}