1use crate::{
2 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel,
3 TaskTiming, ThreadTaskTimings,
4};
5use calloop::{
6 EventLoop,
7 channel::{self, Sender},
8 timer::TimeoutAction,
9};
10use std::{
11 thread,
12 time::{Duration, Instant},
13};
14use util::ResultExt;
15
16struct TimerAfter {
17 duration: Duration,
18 runnable: RunnableVariant,
19}
20
21pub(crate) struct LinuxDispatcher {
22 main_sender: Sender<RunnableVariant>,
23 timer_sender: Sender<TimerAfter>,
24 background_sender: flume::Sender<RunnableVariant>,
25 _background_threads: Vec<thread::JoinHandle<()>>,
26 main_thread_id: thread::ThreadId,
27}
28
29impl LinuxDispatcher {
30 pub fn new(main_sender: Sender<RunnableVariant>) -> Self {
31 let (background_sender, background_receiver) = flume::unbounded::<RunnableVariant>();
32 let thread_count = std::thread::available_parallelism()
33 .map(|i| i.get())
34 .unwrap_or(1);
35
36 let mut background_threads = (0..thread_count)
37 .map(|i| {
38 let receiver = background_receiver.clone();
39 std::thread::Builder::new()
40 .name(format!("Worker-{i}"))
41 .spawn(move || {
42 for runnable in receiver {
43 let start = Instant::now();
44
45 let mut location = match runnable {
46 RunnableVariant::Meta(runnable) => {
47 let location = runnable.metadata().location;
48 let timing = TaskTiming {
49 location,
50 start,
51 end: None,
52 };
53 Self::add_task_timing(timing);
54
55 runnable.run();
56 timing
57 }
58 RunnableVariant::Compat(runnable) => {
59 let location = core::panic::Location::caller();
60 let timing = TaskTiming {
61 location,
62 start,
63 end: None,
64 };
65 Self::add_task_timing(timing);
66
67 runnable.run();
68 timing
69 }
70 };
71
72 let end = Instant::now();
73 location.end = Some(end);
74 Self::add_task_timing(location);
75
76 log::trace!(
77 "background thread {}: ran runnable. took: {:?}",
78 i,
79 start.elapsed()
80 );
81 }
82 })
83 .unwrap()
84 })
85 .collect::<Vec<_>>();
86
87 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
88 let timer_thread = std::thread::Builder::new()
89 .name("Timer".to_owned())
90 .spawn(|| {
91 let mut event_loop: EventLoop<()> =
92 EventLoop::try_new().expect("Failed to initialize timer loop!");
93
94 let handle = event_loop.handle();
95 let timer_handle = event_loop.handle();
96 handle
97 .insert_source(timer_channel, move |e, _, _| {
98 if let channel::Event::Msg(timer) = e {
99 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
100 let mut runnable = Some(timer.runnable);
101 timer_handle
102 .insert_source(
103 calloop::timer::Timer::from_duration(timer.duration),
104 move |_, _, _| {
105 if let Some(runnable) = runnable.take() {
106 let start = Instant::now();
107 let mut timing = match runnable {
108 RunnableVariant::Meta(runnable) => {
109 let location = runnable.metadata().location;
110 let timing = TaskTiming {
111 location,
112 start,
113 end: None,
114 };
115 Self::add_task_timing(timing);
116
117 runnable.run();
118 timing
119 }
120 RunnableVariant::Compat(runnable) => {
121 let timing = TaskTiming {
122 location: core::panic::Location::caller(),
123 start,
124 end: None,
125 };
126 Self::add_task_timing(timing);
127
128 runnable.run();
129 timing
130 }
131 };
132 let end = Instant::now();
133
134 timing.end = Some(end);
135 Self::add_task_timing(timing);
136 }
137 TimeoutAction::Drop
138 },
139 )
140 .expect("Failed to start timer");
141 }
142 })
143 .expect("Failed to start timer thread");
144
145 event_loop.run(None, &mut (), |_| {}).log_err();
146 })
147 .unwrap();
148
149 background_threads.push(timer_thread);
150
151 Self {
152 main_sender,
153 timer_sender,
154 background_sender,
155 _background_threads: background_threads,
156 main_thread_id: thread::current().id(),
157 }
158 }
159
160 pub(crate) fn add_task_timing(timing: TaskTiming) {
161 THREAD_TIMINGS.with(|timings| {
162 let mut timings = timings.lock();
163 let timings = &mut timings.timings;
164
165 if let Some(last_timing) = timings.iter_mut().rev().next() {
166 if last_timing.location == timing.location {
167 last_timing.end = timing.end;
168 return;
169 }
170 }
171
172 timings.push_back(timing);
173 });
174 }
175}
176
177impl PlatformDispatcher for LinuxDispatcher {
178 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
179 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
180 ThreadTaskTimings::convert(&global_timings)
181 }
182
183 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
184 THREAD_TIMINGS.with(|timings| {
185 let timings = timings.lock();
186 let timings = &timings.timings;
187
188 let mut vec = Vec::with_capacity(timings.len());
189
190 let (s1, s2) = timings.as_slices();
191 vec.extend_from_slice(s1);
192 vec.extend_from_slice(s2);
193 vec
194 })
195 }
196
197 fn is_main_thread(&self) -> bool {
198 thread::current().id() == self.main_thread_id
199 }
200
201 fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
202 self.background_sender.send(runnable).unwrap();
203 }
204
205 fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
206 self.main_sender.send(runnable).unwrap_or_else(|runnable| {
207 // NOTE: Runnable may wrap a Future that is !Send.
208 //
209 // This is usually safe because we only poll it on the main thread.
210 // However if the send fails, we know that:
211 // 1. main_receiver has been dropped (which implies the app is shutting down)
212 // 2. we are on a background thread.
213 // It is not safe to drop something !Send on the wrong thread, and
214 // the app will exit soon anyway, so we must forget the runnable.
215 std::mem::forget(runnable);
216 });
217 }
218
219 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
220 self.timer_sender
221 .send(TimerAfter { duration, runnable })
222 .ok();
223 }
224}