1use crate::{
2 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel,
3 TaskTiming, ThreadTaskTimings,
4};
5use calloop::{
6 EventLoop,
7 channel::{self, Sender},
8 timer::TimeoutAction,
9};
10use std::{
11 thread,
12 time::{Duration, Instant},
13};
14use util::ResultExt;
15
16struct TimerAfter {
17 duration: Duration,
18 runnable: RunnableVariant,
19}
20
21pub(crate) struct LinuxDispatcher {
22 main_sender: Sender<RunnableVariant>,
23 timer_sender: Sender<TimerAfter>,
24 background_sender: flume::Sender<RunnableVariant>,
25 _background_threads: Vec<thread::JoinHandle<()>>,
26 main_thread_id: thread::ThreadId,
27}
28
29const MIN_THREADS: usize = 2;
30
31impl LinuxDispatcher {
32 pub fn new(main_sender: Sender<RunnableVariant>) -> Self {
33 let (background_sender, background_receiver) = flume::unbounded::<RunnableVariant>();
34 let thread_count =
35 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
36
37 let mut background_threads = (0..thread_count)
38 .map(|i| {
39 let receiver = background_receiver.clone();
40 std::thread::Builder::new()
41 .name(format!("Worker-{i}"))
42 .spawn(move || {
43 for runnable in receiver {
44 let start = Instant::now();
45
46 let mut location = match runnable {
47 RunnableVariant::Meta(runnable) => {
48 let location = runnable.metadata().location;
49 let timing = TaskTiming {
50 location,
51 start,
52 end: None,
53 };
54 Self::add_task_timing(timing);
55
56 runnable.run();
57 timing
58 }
59 RunnableVariant::Compat(runnable) => {
60 let location = core::panic::Location::caller();
61 let timing = TaskTiming {
62 location,
63 start,
64 end: None,
65 };
66 Self::add_task_timing(timing);
67
68 runnable.run();
69 timing
70 }
71 };
72
73 let end = Instant::now();
74 location.end = Some(end);
75 Self::add_task_timing(location);
76
77 log::trace!(
78 "background thread {}: ran runnable. took: {:?}",
79 i,
80 start.elapsed()
81 );
82 }
83 })
84 .unwrap()
85 })
86 .collect::<Vec<_>>();
87
88 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
89 let timer_thread = std::thread::Builder::new()
90 .name("Timer".to_owned())
91 .spawn(|| {
92 let mut event_loop: EventLoop<()> =
93 EventLoop::try_new().expect("Failed to initialize timer loop!");
94
95 let handle = event_loop.handle();
96 let timer_handle = event_loop.handle();
97 handle
98 .insert_source(timer_channel, move |e, _, _| {
99 if let channel::Event::Msg(timer) = e {
100 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
101 let mut runnable = Some(timer.runnable);
102 timer_handle
103 .insert_source(
104 calloop::timer::Timer::from_duration(timer.duration),
105 move |_, _, _| {
106 if let Some(runnable) = runnable.take() {
107 let start = Instant::now();
108 let mut timing = match runnable {
109 RunnableVariant::Meta(runnable) => {
110 let location = runnable.metadata().location;
111 let timing = TaskTiming {
112 location,
113 start,
114 end: None,
115 };
116 Self::add_task_timing(timing);
117
118 runnable.run();
119 timing
120 }
121 RunnableVariant::Compat(runnable) => {
122 let timing = TaskTiming {
123 location: core::panic::Location::caller(),
124 start,
125 end: None,
126 };
127 Self::add_task_timing(timing);
128
129 runnable.run();
130 timing
131 }
132 };
133 let end = Instant::now();
134
135 timing.end = Some(end);
136 Self::add_task_timing(timing);
137 }
138 TimeoutAction::Drop
139 },
140 )
141 .expect("Failed to start timer");
142 }
143 })
144 .expect("Failed to start timer thread");
145
146 event_loop.run(None, &mut (), |_| {}).log_err();
147 })
148 .unwrap();
149
150 background_threads.push(timer_thread);
151
152 Self {
153 main_sender,
154 timer_sender,
155 background_sender,
156 _background_threads: background_threads,
157 main_thread_id: thread::current().id(),
158 }
159 }
160
161 pub(crate) fn add_task_timing(timing: TaskTiming) {
162 THREAD_TIMINGS.with(|timings| {
163 let mut timings = timings.lock();
164 let timings = &mut timings.timings;
165
166 if let Some(last_timing) = timings.iter_mut().rev().next() {
167 if last_timing.location == timing.location {
168 last_timing.end = timing.end;
169 return;
170 }
171 }
172
173 timings.push_back(timing);
174 });
175 }
176}
177
178impl PlatformDispatcher for LinuxDispatcher {
179 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
180 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
181 ThreadTaskTimings::convert(&global_timings)
182 }
183
184 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
185 THREAD_TIMINGS.with(|timings| {
186 let timings = timings.lock();
187 let timings = &timings.timings;
188
189 let mut vec = Vec::with_capacity(timings.len());
190
191 let (s1, s2) = timings.as_slices();
192 vec.extend_from_slice(s1);
193 vec.extend_from_slice(s2);
194 vec
195 })
196 }
197
198 fn is_main_thread(&self) -> bool {
199 thread::current().id() == self.main_thread_id
200 }
201
202 fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
203 self.background_sender.send(runnable).unwrap();
204 }
205
206 fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
207 self.main_sender.send(runnable).unwrap_or_else(|runnable| {
208 // NOTE: Runnable may wrap a Future that is !Send.
209 //
210 // This is usually safe because we only poll it on the main thread.
211 // However if the send fails, we know that:
212 // 1. main_receiver has been dropped (which implies the app is shutting down)
213 // 2. we are on a background thread.
214 // It is not safe to drop something !Send on the wrong thread, and
215 // the app will exit soon anyway, so we must forget the runnable.
216 std::mem::forget(runnable);
217 });
218 }
219
220 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
221 self.timer_sender
222 .send(TimerAfter { duration, runnable })
223 .ok();
224 }
225}