1use crate::{PlatformDispatcher, TaskLabel};
2use async_task::Runnable;
3use calloop::{
4 channel::{self, Sender},
5 timer::TimeoutAction,
6 EventLoop,
7};
8use mio::Waker;
9use parking::{Parker, Unparker};
10use parking_lot::Mutex;
11use std::{sync::Arc, thread, time::Duration};
12use util::ResultExt;
13
14struct TimerAfter {
15 duration: Duration,
16 runnable: Runnable,
17}
18
19pub(crate) struct LinuxDispatcher {
20 parker: Mutex<Parker>,
21 main_sender: Sender<Runnable>,
22 main_waker: Option<Arc<Waker>>,
23 timer_sender: Sender<TimerAfter>,
24 background_sender: flume::Sender<Runnable>,
25 _background_threads: Vec<thread::JoinHandle<()>>,
26 main_thread_id: thread::ThreadId,
27}
28
29impl LinuxDispatcher {
30 pub fn new(main_sender: Sender<Runnable>, main_waker: Option<Arc<Waker>>) -> Self {
31 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
32 let thread_count = std::thread::available_parallelism()
33 .map(|i| i.get())
34 .unwrap_or(1);
35
36 let mut background_threads = (0..thread_count)
37 .map(|_| {
38 let receiver = background_receiver.clone();
39 std::thread::spawn(move || {
40 for runnable in receiver {
41 runnable.run();
42 }
43 })
44 })
45 .collect::<Vec<_>>();
46
47 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
48 let timer_thread = std::thread::spawn(|| {
49 let mut event_loop: EventLoop<()> =
50 EventLoop::try_new().expect("Failed to initialize timer loop!");
51
52 let handle = event_loop.handle();
53 let timer_handle = event_loop.handle();
54 handle
55 .insert_source(timer_channel, move |e, _, _| {
56 if let channel::Event::Msg(timer) = e {
57 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
58 let mut runnable = Some(timer.runnable);
59 timer_handle
60 .insert_source(
61 calloop::timer::Timer::from_duration(timer.duration),
62 move |_, _, _| {
63 if let Some(runnable) = runnable.take() {
64 runnable.run();
65 }
66 TimeoutAction::Drop
67 },
68 )
69 .expect("Failed to start timer");
70 }
71 })
72 .expect("Failed to start timer thread");
73
74 event_loop.run(None, &mut (), |_| {}).log_err();
75 });
76
77 background_threads.push(timer_thread);
78
79 Self {
80 parker: Mutex::new(Parker::new()),
81 main_sender,
82 main_waker,
83 timer_sender,
84 background_sender,
85 _background_threads: background_threads,
86 main_thread_id: thread::current().id(),
87 }
88 }
89}
90
91impl PlatformDispatcher for LinuxDispatcher {
92 fn is_main_thread(&self) -> bool {
93 thread::current().id() == self.main_thread_id
94 }
95
96 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
97 self.background_sender.send(runnable).unwrap();
98 }
99
100 fn dispatch_on_main_thread(&self, runnable: Runnable) {
101 self.main_sender.send(runnable).ok();
102 if let Some(main_waker) = self.main_waker.as_ref() {
103 main_waker.wake().ok();
104 }
105 }
106
107 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
108 self.timer_sender
109 .send(TimerAfter { duration, runnable })
110 .ok();
111 }
112
113 fn park(&self, timeout: Option<Duration>) -> bool {
114 if let Some(timeout) = timeout {
115 self.parker.lock().park_timeout(timeout)
116 } else {
117 self.parker.lock().park();
118 true
119 }
120 }
121
122 fn unparker(&self) -> Unparker {
123 self.parker.lock().unparker()
124 }
125}