1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4// todo(linux): remove
5#![allow(unused_variables)]
6
7use crate::{PlatformDispatcher, TaskLabel};
8use async_task::Runnable;
9use calloop::{
10 channel::{self, Sender},
11 timer::TimeoutAction,
12 EventLoop,
13};
14use parking::{Parker, Unparker};
15use parking_lot::Mutex;
16use std::{thread, time::Duration};
17use util::ResultExt;
18
19struct TimerAfter {
20 duration: Duration,
21 runnable: Runnable,
22}
23
24pub(crate) struct LinuxDispatcher {
25 parker: Mutex<Parker>,
26 main_sender: Sender<Runnable>,
27 timer_sender: Sender<TimerAfter>,
28 background_sender: flume::Sender<Runnable>,
29 _background_threads: Vec<thread::JoinHandle<()>>,
30 main_thread_id: thread::ThreadId,
31}
32
33impl LinuxDispatcher {
34 pub fn new(main_sender: Sender<Runnable>) -> Self {
35 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
36 let thread_count = std::thread::available_parallelism()
37 .map(|i| i.get())
38 .unwrap_or(1);
39
40 let mut background_threads = (0..thread_count)
41 .map(|_| {
42 let receiver = background_receiver.clone();
43 std::thread::spawn(move || {
44 for runnable in receiver {
45 runnable.run();
46 }
47 })
48 })
49 .collect::<Vec<_>>();
50
51 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
52 let timer_thread = std::thread::spawn(|| {
53 let mut event_loop: EventLoop<()> =
54 EventLoop::try_new().expect("Failed to initialize timer loop!");
55
56 let handle = event_loop.handle();
57 let timer_handle = event_loop.handle();
58 handle
59 .insert_source(timer_channel, move |e, _, _| {
60 if let channel::Event::Msg(timer) = e {
61 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
62 let mut runnable = Some(timer.runnable);
63 timer_handle
64 .insert_source(
65 calloop::timer::Timer::from_duration(timer.duration),
66 move |e, _, _| {
67 if let Some(runnable) = runnable.take() {
68 runnable.run();
69 }
70 TimeoutAction::Drop
71 },
72 )
73 .expect("Failed to start timer");
74 }
75 })
76 .expect("Failed to start timer thread");
77
78 event_loop.run(None, &mut (), |_| {}).log_err();
79 });
80
81 background_threads.push(timer_thread);
82
83 Self {
84 parker: Mutex::new(Parker::new()),
85 main_sender,
86 timer_sender,
87 background_sender,
88 _background_threads: background_threads,
89 main_thread_id: thread::current().id(),
90 }
91 }
92}
93
94impl PlatformDispatcher for LinuxDispatcher {
95 fn is_main_thread(&self) -> bool {
96 thread::current().id() == self.main_thread_id
97 }
98
99 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
100 self.background_sender.send(runnable).unwrap();
101 }
102
103 fn dispatch_on_main_thread(&self, runnable: Runnable) {
104 self.main_sender.send(runnable).ok();
105 }
106
107 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
108 self.timer_sender
109 .send(TimerAfter { duration, runnable })
110 .ok();
111 }
112
113 fn park(&self, timeout: Option<Duration>) -> bool {
114 if let Some(timeout) = timeout {
115 self.parker.lock().park_timeout(timeout)
116 } else {
117 self.parker.lock().park();
118 true
119 }
120 }
121
122 fn unparker(&self) -> Unparker {
123 self.parker.lock().unparker()
124 }
125}