1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4//todo!(linux): remove
5#![allow(unused_variables)]
6
7use crate::platform::linux::client_dispatcher::ClientDispatcher;
8use crate::{PlatformDispatcher, TaskLabel};
9use async_task::Runnable;
10use parking::{Parker, Unparker};
11use parking_lot::Mutex;
12use std::{
13 panic,
14 sync::Arc,
15 thread,
16 time::{Duration, Instant},
17};
18
19pub(crate) struct LinuxDispatcher {
20 client_dispatcher: Arc<dyn ClientDispatcher + Send + Sync>,
21 parker: Mutex<Parker>,
22 timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
23 main_sender: flume::Sender<Runnable>,
24 background_sender: flume::Sender<Runnable>,
25 _background_thread: thread::JoinHandle<()>,
26 main_thread_id: thread::ThreadId,
27}
28
29impl LinuxDispatcher {
30 pub fn new(
31 main_sender: flume::Sender<Runnable>,
32 client_dispatcher: &Arc<dyn ClientDispatcher + Send + Sync>,
33 ) -> Self {
34 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
35 let background_thread = thread::spawn(move || {
36 profiling::register_thread!("background");
37 for runnable in background_receiver {
38 let _ignore_panic = panic::catch_unwind(|| runnable.run());
39 }
40 });
41 Self {
42 client_dispatcher: Arc::clone(client_dispatcher),
43 parker: Mutex::new(Parker::new()),
44 timed_tasks: Mutex::new(Vec::new()),
45 main_sender,
46 background_sender,
47 _background_thread: background_thread,
48 main_thread_id: thread::current().id(),
49 }
50 }
51}
52
53impl PlatformDispatcher for LinuxDispatcher {
54 fn is_main_thread(&self) -> bool {
55 thread::current().id() == self.main_thread_id
56 }
57
58 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
59 self.background_sender.send(runnable).unwrap();
60 }
61
62 fn dispatch_on_main_thread(&self, runnable: Runnable) {
63 self.main_sender.send(runnable).unwrap();
64 self.client_dispatcher.dispatch_on_main_thread();
65 }
66
67 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
68 let moment = Instant::now() + duration;
69 let mut timed_tasks = self.timed_tasks.lock();
70 timed_tasks.push((moment, runnable));
71 timed_tasks.sort_unstable_by(|(a, _), (b, _)| b.cmp(a));
72 }
73
74 fn tick(&self, background_only: bool) -> bool {
75 let mut timed_tasks = self.timed_tasks.lock();
76 let old_count = timed_tasks.len();
77 while let Some(&(moment, _)) = timed_tasks.last() {
78 if moment <= Instant::now() {
79 let (_, runnable) = timed_tasks.pop().unwrap();
80 runnable.run();
81 } else {
82 break;
83 }
84 }
85 timed_tasks.len() != old_count
86 }
87
88 fn park(&self) {
89 self.parker.lock().park()
90 }
91
92 fn unparker(&self) -> Unparker {
93 self.parker.lock().unparker()
94 }
95}