1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4
5use crate::{PlatformDispatcher, TaskLabel};
6use async_task::Runnable;
7use parking::{Parker, Unparker};
8use parking_lot::Mutex;
9use std::{
10 panic, thread,
11 time::{Duration, Instant},
12};
13
14pub(crate) struct LinuxDispatcher {
15 parker: Mutex<Parker>,
16 timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
17 main_sender: flume::Sender<Runnable>,
18 main_receiver: flume::Receiver<Runnable>,
19 background_sender: flume::Sender<Runnable>,
20 background_thread: thread::JoinHandle<()>,
21 main_thread_id: thread::ThreadId,
22}
23
24impl Default for LinuxDispatcher {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl LinuxDispatcher {
31 pub fn new() -> Self {
32 let (main_sender, main_receiver) = flume::unbounded::<Runnable>();
33 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
34 let background_thread = thread::spawn(move || {
35 for runnable in background_receiver {
36 let _ignore_panic = panic::catch_unwind(|| runnable.run());
37 }
38 });
39 LinuxDispatcher {
40 parker: Mutex::new(Parker::new()),
41 timed_tasks: Mutex::new(Vec::new()),
42 main_sender,
43 main_receiver,
44 background_sender,
45 background_thread,
46 main_thread_id: thread::current().id(),
47 }
48 }
49}
50
51impl PlatformDispatcher for LinuxDispatcher {
52 fn is_main_thread(&self) -> bool {
53 thread::current().id() == self.main_thread_id
54 }
55
56 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
57 self.background_sender.send(runnable).unwrap();
58 }
59
60 fn dispatch_on_main_thread(&self, runnable: Runnable) {
61 self.main_sender.send(runnable).unwrap();
62 }
63
64 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
65 let moment = Instant::now() + duration;
66 let mut timed_tasks = self.timed_tasks.lock();
67 timed_tasks.push((moment, runnable));
68 timed_tasks.sort_unstable_by(|&(ref a, _), &(ref b, _)| b.cmp(a));
69 }
70
71 fn tick(&self, background_only: bool) -> bool {
72 let mut ran = false;
73 if self.is_main_thread() && !background_only {
74 for runnable in self.main_receiver.try_iter() {
75 runnable.run();
76 ran = true;
77 }
78 }
79 let mut timed_tasks = self.timed_tasks.lock();
80 while let Some(&(moment, _)) = timed_tasks.last() {
81 if moment <= Instant::now() {
82 let (_, runnable) = timed_tasks.pop().unwrap();
83 runnable.run();
84 ran = true;
85 }
86 }
87 ran
88 }
89
90 fn park(&self) {
91 self.parker.lock().park()
92 }
93
94 fn unparker(&self) -> Unparker {
95 self.parker.lock().unparker()
96 }
97}