1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use parking_lot::Mutex;
5use rand::prelude::*;
6use smol::prelude::*;
7use smol::{channel, Executor};
8use std::rc::Rc;
9use std::sync::mpsc::SyncSender;
10use std::sync::Arc;
11use std::{marker::PhantomData, thread};
12
13use crate::platform;
14
15pub enum Foreground {
16 Platform {
17 dispatcher: Arc<dyn platform::Dispatcher>,
18 _not_send_or_sync: PhantomData<Rc<()>>,
19 },
20 Test(smol::LocalExecutor<'static>),
21 Deterministic(Arc<Deterministic>),
22}
23
24pub enum Background {
25 Deterministic(Arc<Deterministic>),
26 Production {
27 executor: Arc<smol::Executor<'static>>,
28 _stop: channel::Sender<()>,
29 },
30}
31
32pub struct Deterministic {
33 seed: u64,
34 runnables: Arc<Mutex<(Vec<Runnable>, Option<SyncSender<()>>)>>,
35}
36
37impl Deterministic {
38 fn new(seed: u64) -> Self {
39 Self {
40 seed,
41 runnables: Default::default(),
42 }
43 }
44
45 pub fn spawn_local<F, T>(&self, future: F) -> Task<T>
46 where
47 T: 'static,
48 F: Future<Output = T> + 'static,
49 {
50 let runnables = self.runnables.clone();
51 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
52 let mut runnables = runnables.lock();
53 runnables.0.push(runnable);
54 if let Some(wake_tx) = runnables.1.as_ref() {
55 wake_tx.send(()).ok();
56 }
57 });
58 runnable.schedule();
59 task
60 }
61
62 pub fn spawn<F, T>(&self, future: F) -> Task<T>
63 where
64 T: 'static + Send,
65 F: 'static + Send + Future<Output = T>,
66 {
67 let runnables = self.runnables.clone();
68 let (runnable, task) = async_task::spawn(future, move |runnable| {
69 let mut runnables = runnables.lock();
70 runnables.0.push(runnable);
71 if let Some(wake_tx) = runnables.1.as_ref() {
72 wake_tx.send(()).ok();
73 }
74 });
75 runnable.schedule();
76 task
77 }
78
79 pub fn run<F, T>(&self, future: F) -> T
80 where
81 T: 'static,
82 F: Future<Output = T> + 'static,
83 {
84 let (wake_tx, wake_rx) = std::sync::mpsc::sync_channel(32);
85 let runnables = self.runnables.clone();
86 runnables.lock().1 = Some(wake_tx);
87
88 let (output_tx, output_rx) = std::sync::mpsc::channel();
89 self.spawn_local(async move {
90 let output = future.await;
91 output_tx.send(output).unwrap();
92 })
93 .detach();
94
95 let mut rng = StdRng::seed_from_u64(self.seed);
96 loop {
97 if let Ok(value) = output_rx.try_recv() {
98 runnables.lock().1 = None;
99 return value;
100 }
101
102 wake_rx.recv().unwrap();
103 let runnable = {
104 let mut runnables = runnables.lock();
105 let runnables = &mut runnables.0;
106 let index = rng.gen_range(0..runnables.len());
107 runnables.remove(index)
108 };
109
110 runnable.run();
111 }
112 }
113}
114
115impl Foreground {
116 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
117 if dispatcher.is_main_thread() {
118 Ok(Self::Platform {
119 dispatcher,
120 _not_send_or_sync: PhantomData,
121 })
122 } else {
123 Err(anyhow!("must be constructed on main thread"))
124 }
125 }
126
127 pub fn test() -> Self {
128 Self::Test(smol::LocalExecutor::new())
129 }
130
131 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
132 match self {
133 Self::Platform { dispatcher, .. } => {
134 let dispatcher = dispatcher.clone();
135 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
136 let (runnable, task) = async_task::spawn_local(future, schedule);
137 runnable.schedule();
138 task
139 }
140 Self::Test(executor) => executor.spawn(future),
141 Self::Deterministic(executor) => executor.spawn_local(future),
142 }
143 }
144
145 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
146 match self {
147 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
148 Self::Test(executor) => smol::block_on(executor.run(future)),
149 Self::Deterministic(executor) => executor.run(future),
150 }
151 }
152}
153
154impl Background {
155 pub fn new() -> Self {
156 let executor = Arc::new(Executor::new());
157 let stop = channel::unbounded::<()>();
158
159 for i in 0..num_cpus::get() {
160 let executor = executor.clone();
161 let stop = stop.1.clone();
162 thread::Builder::new()
163 .name(format!("background-executor-{}", i))
164 .spawn(move || smol::block_on(executor.run(stop.recv())))
165 .unwrap();
166 }
167
168 Self::Production {
169 executor,
170 _stop: stop.0,
171 }
172 }
173
174 pub fn spawn<T, F>(&self, future: F) -> Task<T>
175 where
176 T: 'static + Send,
177 F: Send + Future<Output = T> + 'static,
178 {
179 match self {
180 Self::Production { executor, .. } => executor.spawn(future),
181 Self::Deterministic(executor) => executor.spawn(future),
182 }
183 }
184}
185
186pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
187 let executor = Arc::new(Deterministic::new(seed));
188 (
189 Rc::new(Foreground::Deterministic(executor.clone())),
190 Arc::new(Background::Deterministic(executor)),
191 )
192}