1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4//todo!(linux): remove
5#![allow(unused_variables)]
6
7use crate::platform::linux::client_dispatcher::ClientDispatcher;
8use crate::{PlatformDispatcher, TaskLabel};
9use async_task::Runnable;
10use parking::{Parker, Unparker};
11use parking_lot::Mutex;
12use std::{
13 panic,
14 sync::Arc,
15 thread,
16 time::{Duration, Instant},
17};
18
19pub(crate) struct LinuxDispatcher {
20 client_dispatcher: Arc<dyn ClientDispatcher + Send + Sync>,
21 parker: Mutex<Parker>,
22 timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
23 main_sender: flume::Sender<Runnable>,
24 background_sender: flume::Sender<Runnable>,
25 _background_thread: thread::JoinHandle<()>,
26 main_thread_id: thread::ThreadId,
27}
28
29impl LinuxDispatcher {
30 pub fn new(
31 main_sender: flume::Sender<Runnable>,
32 client_dispatcher: &Arc<dyn ClientDispatcher + Send + Sync>,
33 ) -> Self {
34 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
35 let background_thread = thread::spawn(move || {
36 for runnable in background_receiver {
37 let _ignore_panic = panic::catch_unwind(|| runnable.run());
38 }
39 });
40 Self {
41 client_dispatcher: Arc::clone(client_dispatcher),
42 parker: Mutex::new(Parker::new()),
43 timed_tasks: Mutex::new(Vec::new()),
44 main_sender,
45 background_sender,
46 _background_thread: background_thread,
47 main_thread_id: thread::current().id(),
48 }
49 }
50}
51
52impl PlatformDispatcher for LinuxDispatcher {
53 fn is_main_thread(&self) -> bool {
54 thread::current().id() == self.main_thread_id
55 }
56
57 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
58 self.background_sender.send(runnable).unwrap();
59 }
60
61 fn dispatch_on_main_thread(&self, runnable: Runnable) {
62 self.main_sender.send(runnable).unwrap();
63 self.client_dispatcher.dispatch_on_main_thread();
64 }
65
66 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
67 let moment = Instant::now() + duration;
68 let mut timed_tasks = self.timed_tasks.lock();
69 timed_tasks.push((moment, runnable));
70 timed_tasks.sort_unstable_by(|(a, _), (b, _)| b.cmp(a));
71 }
72
73 fn tick(&self, background_only: bool) -> bool {
74 let mut timed_tasks = self.timed_tasks.lock();
75 let old_count = timed_tasks.len();
76 while let Some(&(moment, _)) = timed_tasks.last() {
77 if moment <= Instant::now() {
78 let (_, runnable) = timed_tasks.pop().unwrap();
79 runnable.run();
80 } else {
81 break;
82 }
83 }
84 timed_tasks.len() != old_count
85 }
86
87 fn park(&self) {
88 self.parker.lock().park()
89 }
90
91 fn unparker(&self) -> Unparker {
92 self.parker.lock().unparker()
93 }
94}