dispatcher.rs

 1#![allow(non_upper_case_globals)]
 2#![allow(non_camel_case_types)]
 3#![allow(non_snake_case)]
 4
 5use crate::{PlatformDispatcher, TaskLabel};
 6use async_task::Runnable;
 7use parking::{Parker, Unparker};
 8use parking_lot::Mutex;
 9use std::{
10    panic, thread,
11    time::{Duration, Instant},
12};
13
14pub(crate) struct LinuxDispatcher {
15    parker: Mutex<Parker>,
16    timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
17    main_sender: flume::Sender<Runnable>,
18    main_receiver: flume::Receiver<Runnable>,
19    background_sender: flume::Sender<Runnable>,
20    background_thread: thread::JoinHandle<()>,
21    main_thread_id: thread::ThreadId,
22}
23
24impl Default for LinuxDispatcher {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl LinuxDispatcher {
31    pub fn new() -> Self {
32        let (main_sender, main_receiver) = flume::unbounded::<Runnable>();
33        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
34        let background_thread = thread::spawn(move || {
35            for runnable in background_receiver {
36                let _ignore_panic = panic::catch_unwind(|| runnable.run());
37            }
38        });
39        LinuxDispatcher {
40            parker: Mutex::new(Parker::new()),
41            timed_tasks: Mutex::new(Vec::new()),
42            main_sender,
43            main_receiver,
44            background_sender,
45            background_thread,
46            main_thread_id: thread::current().id(),
47        }
48    }
49}
50
51impl PlatformDispatcher for LinuxDispatcher {
52    fn is_main_thread(&self) -> bool {
53        thread::current().id() == self.main_thread_id
54    }
55
56    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
57        self.background_sender.send(runnable).unwrap();
58    }
59
60    fn dispatch_on_main_thread(&self, runnable: Runnable) {
61        self.main_sender.send(runnable).unwrap();
62    }
63
64    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
65        let moment = Instant::now() + duration;
66        let mut timed_tasks = self.timed_tasks.lock();
67        timed_tasks.push((moment, runnable));
68        timed_tasks.sort_unstable_by(|&(ref a, _), &(ref b, _)| b.cmp(a));
69    }
70
71    fn tick(&self, background_only: bool) -> bool {
72        let mut ran = false;
73        if self.is_main_thread() && !background_only {
74            for runnable in self.main_receiver.try_iter() {
75                runnable.run();
76                ran = true;
77            }
78        }
79        let mut timed_tasks = self.timed_tasks.lock();
80        while let Some(&(moment, _)) = timed_tasks.last() {
81            if moment <= Instant::now() {
82                let (_, runnable) = timed_tasks.pop().unwrap();
83                runnable.run();
84                ran = true;
85            }
86        }
87        ran
88    }
89
90    fn park(&self) {
91        self.parker.lock().park()
92    }
93
94    fn unparker(&self) -> Unparker {
95        self.parker.lock().unparker()
96    }
97}