dispatcher.rs

  1use crate::{PlatformDispatcher, Priority, RunnableVariant, TaskLabel};
  2use backtrace::Backtrace;
  3use collections::{HashMap, HashSet, VecDeque};
  4use parking::Unparker;
  5use parking_lot::Mutex;
  6use rand::prelude::*;
  7use std::{
  8    future::Future,
  9    ops::RangeInclusive,
 10    pin::Pin,
 11    sync::Arc,
 12    task::{Context, Poll},
 13    time::{Duration, Instant},
 14};
 15use util::post_inc;
 16
 17#[derive(Copy, Clone, PartialEq, Eq, Hash)]
 18struct TestDispatcherId(usize);
 19
 20#[doc(hidden)]
 21pub struct TestDispatcher {
 22    id: TestDispatcherId,
 23    state: Arc<Mutex<TestDispatcherState>>,
 24}
 25
 26struct TestDispatcherState {
 27    random: StdRng,
 28    foreground: HashMap<TestDispatcherId, VecDeque<RunnableVariant>>,
 29    background: Vec<RunnableVariant>,
 30    deprioritized_background: Vec<RunnableVariant>,
 31    delayed: Vec<(Duration, RunnableVariant)>,
 32    start_time: Instant,
 33    time: Duration,
 34    is_main_thread: bool,
 35    next_id: TestDispatcherId,
 36    allow_parking: bool,
 37    waiting_hint: Option<String>,
 38    waiting_backtrace: Option<Backtrace>,
 39    deprioritized_task_labels: HashSet<TaskLabel>,
 40    block_on_ticks: RangeInclusive<usize>,
 41    unparkers: Vec<Unparker>,
 42}
 43
 44impl TestDispatcher {
 45    pub fn new(random: StdRng) -> Self {
 46        let state = TestDispatcherState {
 47            random,
 48            foreground: HashMap::default(),
 49            background: Vec::new(),
 50            deprioritized_background: Vec::new(),
 51            delayed: Vec::new(),
 52            time: Duration::ZERO,
 53            start_time: Instant::now(),
 54            is_main_thread: true,
 55            next_id: TestDispatcherId(1),
 56            allow_parking: false,
 57            waiting_hint: None,
 58            waiting_backtrace: None,
 59            deprioritized_task_labels: Default::default(),
 60            block_on_ticks: 0..=1000,
 61            unparkers: Default::default(),
 62        };
 63
 64        TestDispatcher {
 65            id: TestDispatcherId(0),
 66            state: Arc::new(Mutex::new(state)),
 67        }
 68    }
 69
 70    pub fn advance_clock(&self, by: Duration) {
 71        let new_now = self.state.lock().time + by;
 72        loop {
 73            self.run_until_parked();
 74            let state = self.state.lock();
 75            let next_due_time = state.delayed.first().map(|(time, _)| *time);
 76            drop(state);
 77            if let Some(due_time) = next_due_time
 78                && due_time <= new_now
 79            {
 80                self.state.lock().time = due_time;
 81                continue;
 82            }
 83            break;
 84        }
 85        self.state.lock().time = new_now;
 86    }
 87
 88    pub fn advance_clock_to_next_delayed(&self) -> bool {
 89        let next_due_time = self.state.lock().delayed.first().map(|(time, _)| *time);
 90        if let Some(next_due_time) = next_due_time {
 91            self.state.lock().time = next_due_time;
 92            return true;
 93        }
 94        false
 95    }
 96
 97    pub fn simulate_random_delay(&self) -> impl 'static + Send + Future<Output = ()> + use<> {
 98        struct YieldNow {
 99            pub(crate) count: usize,
100        }
101
102        impl Future for YieldNow {
103            type Output = ();
104
105            fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
106                if self.count > 0 {
107                    self.count -= 1;
108                    cx.waker().wake_by_ref();
109                    Poll::Pending
110                } else {
111                    Poll::Ready(())
112                }
113            }
114        }
115
116        YieldNow {
117            count: self.state.lock().random.random_range(0..10),
118        }
119    }
120
121    pub fn tick(&self, background_only: bool) -> bool {
122        let mut state = self.state.lock();
123
124        while let Some((deadline, _)) = state.delayed.first() {
125            if *deadline > state.time {
126                break;
127            }
128            let (_, runnable) = state.delayed.remove(0);
129            state.background.push(runnable);
130        }
131
132        let foreground_len: usize = if background_only {
133            0
134        } else {
135            state
136                .foreground
137                .values()
138                .map(|runnables| runnables.len())
139                .sum()
140        };
141        let background_len = state.background.len();
142
143        let runnable;
144        let main_thread;
145        if foreground_len == 0 && background_len == 0 {
146            let deprioritized_background_len = state.deprioritized_background.len();
147            if deprioritized_background_len == 0 {
148                return false;
149            }
150            let ix = state.random.random_range(0..deprioritized_background_len);
151            main_thread = false;
152            runnable = state.deprioritized_background.swap_remove(ix);
153        } else {
154            main_thread = state.random.random_ratio(
155                foreground_len as u32,
156                (foreground_len + background_len) as u32,
157            );
158            if main_thread {
159                let state = &mut *state;
160                runnable = state
161                    .foreground
162                    .values_mut()
163                    .filter(|runnables| !runnables.is_empty())
164                    .choose(&mut state.random)
165                    .unwrap()
166                    .pop_front()
167                    .unwrap();
168            } else {
169                let ix = state.random.random_range(0..background_len);
170                runnable = state.background.swap_remove(ix);
171            };
172        };
173
174        let was_main_thread = state.is_main_thread;
175        state.is_main_thread = main_thread;
176        drop(state);
177
178        // todo(localcc): add timings to tests
179        match runnable {
180            RunnableVariant::Meta(runnable) => {
181                if let Some(ref app_token) = runnable.metadata().app {
182                    if !app_token.is_alive() {
183                        drop(runnable);
184                        self.state.lock().is_main_thread = was_main_thread;
185                        return true;
186                    }
187                }
188                runnable.run()
189            }
190            RunnableVariant::Compat(runnable) => runnable.run(),
191        };
192
193        self.state.lock().is_main_thread = was_main_thread;
194
195        true
196    }
197
198    pub fn deprioritize(&self, task_label: TaskLabel) {
199        self.state
200            .lock()
201            .deprioritized_task_labels
202            .insert(task_label);
203    }
204
205    pub fn run_until_parked(&self) {
206        while self.tick(false) {}
207    }
208
209    pub fn parking_allowed(&self) -> bool {
210        self.state.lock().allow_parking
211    }
212
213    pub fn allow_parking(&self) {
214        self.state.lock().allow_parking = true
215    }
216
217    pub fn forbid_parking(&self) {
218        self.state.lock().allow_parking = false
219    }
220
221    pub fn set_waiting_hint(&self, msg: Option<String>) {
222        self.state.lock().waiting_hint = msg
223    }
224
225    pub fn waiting_hint(&self) -> Option<String> {
226        self.state.lock().waiting_hint.clone()
227    }
228
229    pub fn start_waiting(&self) {
230        self.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
231    }
232
233    pub fn finish_waiting(&self) {
234        self.state.lock().waiting_backtrace.take();
235    }
236
237    pub fn waiting_backtrace(&self) -> Option<Backtrace> {
238        self.state.lock().waiting_backtrace.take().map(|mut b| {
239            b.resolve();
240            b
241        })
242    }
243
244    pub fn rng(&self) -> StdRng {
245        self.state.lock().random.clone()
246    }
247
248    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
249        self.state.lock().block_on_ticks = range;
250    }
251
252    pub fn gen_block_on_ticks(&self) -> usize {
253        let mut lock = self.state.lock();
254        let block_on_ticks = lock.block_on_ticks.clone();
255        lock.random.random_range(block_on_ticks)
256    }
257
258    pub fn unpark_all(&self) {
259        self.state.lock().unparkers.retain(|parker| parker.unpark());
260    }
261
262    pub fn push_unparker(&self, unparker: Unparker) {
263        let mut state = self.state.lock();
264        state.unparkers.push(unparker);
265    }
266}
267
268impl Clone for TestDispatcher {
269    fn clone(&self) -> Self {
270        let id = post_inc(&mut self.state.lock().next_id.0);
271        Self {
272            id: TestDispatcherId(id),
273            state: self.state.clone(),
274        }
275    }
276}
277
278impl PlatformDispatcher for TestDispatcher {
279    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
280        Vec::new()
281    }
282
283    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
284        Vec::new()
285    }
286
287    fn is_main_thread(&self) -> bool {
288        self.state.lock().is_main_thread
289    }
290
291    fn now(&self) -> Instant {
292        let state = self.state.lock();
293        state.start_time + state.time
294    }
295
296    fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, _priority: Priority) {
297        {
298            let mut state = self.state.lock();
299            if label.is_some_and(|label| state.deprioritized_task_labels.contains(&label)) {
300                state.deprioritized_background.push(runnable);
301            } else {
302                state.background.push(runnable);
303            }
304        }
305        self.unpark_all();
306    }
307
308    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
309        self.state
310            .lock()
311            .foreground
312            .entry(self.id)
313            .or_default()
314            .push_back(runnable);
315        self.unpark_all();
316    }
317
318    fn dispatch_after(&self, duration: std::time::Duration, runnable: RunnableVariant) {
319        let mut state = self.state.lock();
320        let next_time = state.time + duration;
321        let ix = match state.delayed.binary_search_by_key(&next_time, |e| e.0) {
322            Ok(ix) | Err(ix) => ix,
323        };
324        state.delayed.insert(ix, (next_time, runnable));
325    }
326
327    fn as_test(&self) -> Option<&TestDispatcher> {
328        Some(self)
329    }
330
331    fn spawn_realtime(&self, _priority: crate::RealtimePriority, f: Box<dyn FnOnce() + Send>) {
332        std::thread::spawn(move || {
333            f();
334        });
335    }
336}