1use crate::{PlatformDispatcher, TaskLabel};
2use async_task::Runnable;
3use calloop::{
4 channel::{self, Sender},
5 timer::TimeoutAction,
6 EventLoop,
7};
8use parking::{Parker, Unparker};
9use parking_lot::Mutex;
10use std::{thread, time::Duration};
11use util::ResultExt;
12
13struct TimerAfter {
14 duration: Duration,
15 runnable: Runnable,
16}
17
18pub(crate) struct LinuxDispatcher {
19 parker: Mutex<Parker>,
20 main_sender: Sender<Runnable>,
21 timer_sender: Sender<TimerAfter>,
22 background_sender: flume::Sender<Runnable>,
23 _background_threads: Vec<thread::JoinHandle<()>>,
24 main_thread_id: thread::ThreadId,
25}
26
27impl LinuxDispatcher {
28 pub fn new(main_sender: Sender<Runnable>) -> Self {
29 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
30 let thread_count = std::thread::available_parallelism()
31 .map(|i| i.get())
32 .unwrap_or(1);
33
34 let mut background_threads = (0..thread_count)
35 .map(|_| {
36 let receiver = background_receiver.clone();
37 std::thread::spawn(move || {
38 for runnable in receiver {
39 runnable.run();
40 }
41 })
42 })
43 .collect::<Vec<_>>();
44
45 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
46 let timer_thread = std::thread::spawn(|| {
47 let mut event_loop: EventLoop<()> =
48 EventLoop::try_new().expect("Failed to initialize timer loop!");
49
50 let handle = event_loop.handle();
51 let timer_handle = event_loop.handle();
52 handle
53 .insert_source(timer_channel, move |e, _, _| {
54 if let channel::Event::Msg(timer) = e {
55 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
56 let mut runnable = Some(timer.runnable);
57 timer_handle
58 .insert_source(
59 calloop::timer::Timer::from_duration(timer.duration),
60 move |_, _, _| {
61 if let Some(runnable) = runnable.take() {
62 runnable.run();
63 }
64 TimeoutAction::Drop
65 },
66 )
67 .expect("Failed to start timer");
68 }
69 })
70 .expect("Failed to start timer thread");
71
72 event_loop.run(None, &mut (), |_| {}).log_err();
73 });
74
75 background_threads.push(timer_thread);
76
77 Self {
78 parker: Mutex::new(Parker::new()),
79 main_sender,
80 timer_sender,
81 background_sender,
82 _background_threads: background_threads,
83 main_thread_id: thread::current().id(),
84 }
85 }
86}
87
88impl PlatformDispatcher for LinuxDispatcher {
89 fn is_main_thread(&self) -> bool {
90 thread::current().id() == self.main_thread_id
91 }
92
93 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
94 self.background_sender.send(runnable).unwrap();
95 }
96
97 fn dispatch_on_main_thread(&self, runnable: Runnable) {
98 self.main_sender.send(runnable).ok();
99 }
100
101 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
102 self.timer_sender
103 .send(TimerAfter { duration, runnable })
104 .ok();
105 }
106
107 fn park(&self, timeout: Option<Duration>) -> bool {
108 if let Some(timeout) = timeout {
109 self.parker.lock().park_timeout(timeout)
110 } else {
111 self.parker.lock().park();
112 true
113 }
114 }
115
116 fn unparker(&self) -> Unparker {
117 self.parker.lock().unparker()
118 }
119}