1use crate::{PlatformDispatcher, TaskLabel};
2use async_task::Runnable;
3use calloop::{
4 EventLoop,
5 channel::{self, Sender},
6 timer::TimeoutAction,
7};
8use parking::{Parker, Unparker};
9use parking_lot::Mutex;
10use std::{
11 thread,
12 time::{Duration, Instant},
13};
14use util::ResultExt;
15
16struct TimerAfter {
17 duration: Duration,
18 runnable: Runnable,
19}
20
21pub(crate) struct LinuxDispatcher {
22 parker: Mutex<Parker>,
23 main_sender: Sender<Runnable>,
24 timer_sender: Sender<TimerAfter>,
25 background_sender: flume::Sender<Runnable>,
26 _background_threads: Vec<thread::JoinHandle<()>>,
27 main_thread_id: thread::ThreadId,
28}
29
30impl LinuxDispatcher {
31 pub fn new(main_sender: Sender<Runnable>) -> Self {
32 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
33 let thread_count = std::thread::available_parallelism()
34 .map(|i| i.get())
35 .unwrap_or(1);
36
37 let mut background_threads = (0..thread_count)
38 .map(|i| {
39 let receiver = background_receiver.clone();
40 std::thread::Builder::new()
41 .name(format!("Worker-{i}"))
42 .spawn(move || {
43 for runnable in receiver {
44 let start = Instant::now();
45
46 runnable.run();
47
48 log::trace!(
49 "background thread {}: ran runnable. took: {:?}",
50 i,
51 start.elapsed()
52 );
53 }
54 })
55 .unwrap()
56 })
57 .collect::<Vec<_>>();
58
59 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
60 let timer_thread = std::thread::Builder::new()
61 .name("Timer".to_owned())
62 .spawn(|| {
63 let mut event_loop: EventLoop<()> =
64 EventLoop::try_new().expect("Failed to initialize timer loop!");
65
66 let handle = event_loop.handle();
67 let timer_handle = event_loop.handle();
68 handle
69 .insert_source(timer_channel, move |e, _, _| {
70 if let channel::Event::Msg(timer) = e {
71 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
72 let mut runnable = Some(timer.runnable);
73 timer_handle
74 .insert_source(
75 calloop::timer::Timer::from_duration(timer.duration),
76 move |_, _, _| {
77 if let Some(runnable) = runnable.take() {
78 runnable.run();
79 }
80 TimeoutAction::Drop
81 },
82 )
83 .expect("Failed to start timer");
84 }
85 })
86 .expect("Failed to start timer thread");
87
88 event_loop.run(None, &mut (), |_| {}).log_err();
89 })
90 .unwrap();
91
92 background_threads.push(timer_thread);
93
94 Self {
95 parker: Mutex::new(Parker::new()),
96 main_sender,
97 timer_sender,
98 background_sender,
99 _background_threads: background_threads,
100 main_thread_id: thread::current().id(),
101 }
102 }
103}
104
105impl PlatformDispatcher for LinuxDispatcher {
106 fn is_main_thread(&self) -> bool {
107 thread::current().id() == self.main_thread_id
108 }
109
110 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
111 self.background_sender.send(runnable).unwrap();
112 }
113
114 fn dispatch_on_main_thread(&self, runnable: Runnable) {
115 self.main_sender.send(runnable).unwrap_or_else(|runnable| {
116 // NOTE: Runnable may wrap a Future that is !Send.
117 //
118 // This is usually safe because we only poll it on the main thread.
119 // However if the send fails, we know that:
120 // 1. main_receiver has been dropped (which implies the app is shutting down)
121 // 2. we are on a background thread.
122 // It is not safe to drop something !Send on the wrong thread, and
123 // the app will exit soon anyway, so we must forget the runnable.
124 std::mem::forget(runnable);
125 });
126 }
127
128 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
129 self.timer_sender
130 .send(TimerAfter { duration, runnable })
131 .ok();
132 }
133
134 fn park(&self, timeout: Option<Duration>) -> bool {
135 if let Some(timeout) = timeout {
136 self.parker.lock().park_timeout(timeout)
137 } else {
138 self.parker.lock().park();
139 true
140 }
141 }
142
143 fn unparker(&self) -> Unparker {
144 self.parker.lock().unparker()
145 }
146}