dispatcher.rs

 1#![allow(non_upper_case_globals)]
 2#![allow(non_camel_case_types)]
 3#![allow(non_snake_case)]
 4//todo!(linux): remove
 5#![allow(unused_variables)]
 6
 7use crate::platform::linux::client_dispatcher::ClientDispatcher;
 8use crate::{PlatformDispatcher, TaskLabel};
 9use async_task::Runnable;
10use parking::{Parker, Unparker};
11use parking_lot::Mutex;
12use std::{
13    panic,
14    sync::Arc,
15    thread,
16    time::{Duration, Instant},
17};
18
19pub(crate) struct LinuxDispatcher {
20    client_dispatcher: Arc<dyn ClientDispatcher + Send + Sync>,
21    parker: Mutex<Parker>,
22    timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
23    main_sender: flume::Sender<Runnable>,
24    background_sender: flume::Sender<Runnable>,
25    _background_thread: thread::JoinHandle<()>,
26    main_thread_id: thread::ThreadId,
27}
28
29impl LinuxDispatcher {
30    pub fn new(
31        main_sender: flume::Sender<Runnable>,
32        client_dispatcher: &Arc<dyn ClientDispatcher + Send + Sync>,
33    ) -> Self {
34        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
35        let background_thread = thread::spawn(move || {
36            for runnable in background_receiver {
37                let _ignore_panic = panic::catch_unwind(|| runnable.run());
38            }
39        });
40        Self {
41            client_dispatcher: Arc::clone(client_dispatcher),
42            parker: Mutex::new(Parker::new()),
43            timed_tasks: Mutex::new(Vec::new()),
44            main_sender,
45            background_sender,
46            _background_thread: background_thread,
47            main_thread_id: thread::current().id(),
48        }
49    }
50}
51
52impl PlatformDispatcher for LinuxDispatcher {
53    fn is_main_thread(&self) -> bool {
54        thread::current().id() == self.main_thread_id
55    }
56
57    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
58        self.background_sender.send(runnable).unwrap();
59    }
60
61    fn dispatch_on_main_thread(&self, runnable: Runnable) {
62        self.main_sender.send(runnable).unwrap();
63        self.client_dispatcher.dispatch_on_main_thread();
64    }
65
66    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
67        let moment = Instant::now() + duration;
68        let mut timed_tasks = self.timed_tasks.lock();
69        timed_tasks.push((moment, runnable));
70        timed_tasks.sort_unstable_by(|(a, _), (b, _)| b.cmp(a));
71    }
72
73    fn tick(&self, background_only: bool) -> bool {
74        let mut timed_tasks = self.timed_tasks.lock();
75        let old_count = timed_tasks.len();
76        while let Some(&(moment, _)) = timed_tasks.last() {
77            if moment <= Instant::now() {
78                let (_, runnable) = timed_tasks.pop().unwrap();
79                runnable.run();
80            } else {
81                break;
82            }
83        }
84        timed_tasks.len() != old_count
85    }
86
87    fn park(&self) {
88        self.parker.lock().park()
89    }
90
91    fn unparker(&self) -> Unparker {
92        self.parker.lock().unparker()
93    }
94}