dispatcher.rs

 1#![allow(non_upper_case_globals)]
 2#![allow(non_camel_case_types)]
 3#![allow(non_snake_case)]
 4//todo!(linux): remove
 5#![allow(unused_variables)]
 6
 7use crate::platform::linux::client_dispatcher::ClientDispatcher;
 8use crate::{PlatformDispatcher, TaskLabel};
 9use async_task::Runnable;
10use parking::{Parker, Unparker};
11use parking_lot::Mutex;
12use std::{
13    panic,
14    sync::Arc,
15    thread,
16    time::{Duration, Instant},
17};
18
19pub(crate) struct LinuxDispatcher {
20    client_dispatcher: Arc<dyn ClientDispatcher + Send + Sync>,
21    parker: Mutex<Parker>,
22    timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
23    main_sender: flume::Sender<Runnable>,
24    background_sender: flume::Sender<Runnable>,
25    _background_thread: thread::JoinHandle<()>,
26    main_thread_id: thread::ThreadId,
27}
28
29impl LinuxDispatcher {
30    pub fn new(
31        main_sender: flume::Sender<Runnable>,
32        client_dispatcher: &Arc<dyn ClientDispatcher + Send + Sync>,
33    ) -> Self {
34        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
35        let background_thread = thread::spawn(move || {
36            profiling::register_thread!("background");
37            for runnable in background_receiver {
38                let _ignore_panic = panic::catch_unwind(|| runnable.run());
39            }
40        });
41        Self {
42            client_dispatcher: Arc::clone(client_dispatcher),
43            parker: Mutex::new(Parker::new()),
44            timed_tasks: Mutex::new(Vec::new()),
45            main_sender,
46            background_sender,
47            _background_thread: background_thread,
48            main_thread_id: thread::current().id(),
49        }
50    }
51}
52
53impl PlatformDispatcher for LinuxDispatcher {
54    fn is_main_thread(&self) -> bool {
55        thread::current().id() == self.main_thread_id
56    }
57
58    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
59        self.background_sender.send(runnable).unwrap();
60    }
61
62    fn dispatch_on_main_thread(&self, runnable: Runnable) {
63        self.main_sender.send(runnable).unwrap();
64        self.client_dispatcher.dispatch_on_main_thread();
65    }
66
67    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
68        let moment = Instant::now() + duration;
69        let mut timed_tasks = self.timed_tasks.lock();
70        timed_tasks.push((moment, runnable));
71        timed_tasks.sort_unstable_by(|(a, _), (b, _)| b.cmp(a));
72    }
73
74    fn tick(&self, background_only: bool) -> bool {
75        let mut timed_tasks = self.timed_tasks.lock();
76        let old_count = timed_tasks.len();
77        while let Some(&(moment, _)) = timed_tasks.last() {
78            if moment <= Instant::now() {
79                let (_, runnable) = timed_tasks.pop().unwrap();
80                runnable.run();
81            } else {
82                break;
83            }
84        }
85        timed_tasks.len() != old_count
86    }
87
88    fn park(&self) {
89        self.parker.lock().park()
90    }
91
92    fn unparker(&self) -> Unparker {
93        self.parker.lock().unparker()
94    }
95}