1use calloop::{
2 EventLoop, PostAction,
3 channel::{self, Sender},
4 timer::TimeoutAction,
5};
6use util::ResultExt;
7
8use std::{mem::MaybeUninit, thread, time::Duration};
9
10use crate::{
11 GLOBAL_THREAD_TIMINGS, GpuiRunnable, PlatformDispatcher, Priority, PriorityQueueReceiver,
12 PriorityQueueSender, RealtimePriority, THREAD_TIMINGS, TaskLabel, ThreadTaskTimings,
13};
14
15struct TimerAfter {
16 duration: Duration,
17 runnable: GpuiRunnable,
18}
19
20pub(crate) struct LinuxDispatcher {
21 main_sender: PriorityQueueCalloopSender<GpuiRunnable>,
22 timer_sender: Sender<TimerAfter>,
23 background_sender: PriorityQueueSender<GpuiRunnable>,
24 _background_threads: Vec<thread::JoinHandle<()>>,
25 main_thread_id: thread::ThreadId,
26}
27
28const MIN_THREADS: usize = 2;
29
30impl LinuxDispatcher {
31 pub fn new(main_sender: PriorityQueueCalloopSender<GpuiRunnable>) -> Self {
32 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
33 let thread_count =
34 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
35
36 // These thread should really be lower prio then the foreground
37 // executor
38 let mut background_threads = (0..thread_count)
39 .map(|i| {
40 let mut receiver: PriorityQueueReceiver<GpuiRunnable> = background_receiver.clone();
41 std::thread::Builder::new()
42 .name(format!("Worker-{i}"))
43 .spawn(move || {
44 for runnable in receiver.iter() {
45 let started = runnable.run_and_profile();
46 log::trace!(
47 "background thread {}: ran runnable. took: {:?}",
48 i,
49 started.elapsed()
50 );
51 }
52 })
53 .unwrap()
54 })
55 .collect::<Vec<_>>();
56
57 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
58 let timer_thread = std::thread::Builder::new()
59 .name("Timer".to_owned())
60 .spawn(|| {
61 let mut event_loop: EventLoop<()> =
62 EventLoop::try_new().expect("Failed to initialize timer loop!");
63
64 let handle = event_loop.handle();
65 let timer_handle = event_loop.handle();
66 handle
67 .insert_source(timer_channel, move |e, _, _| {
68 if let channel::Event::Msg(timer) = e {
69 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
70 let mut runnable = Some(timer.runnable);
71 timer_handle
72 .insert_source(
73 calloop::timer::Timer::from_duration(timer.duration),
74 move |_, _, _| {
75 if let Some(runnable) = runnable.take() {
76 runnable.run_and_profile();
77 }
78 TimeoutAction::Drop
79 },
80 )
81 .expect("Failed to start timer");
82 }
83 })
84 .expect("Failed to start timer thread");
85
86 event_loop.run(None, &mut (), |_| {}).log_err();
87 })
88 .unwrap();
89
90 background_threads.push(timer_thread);
91
92 Self {
93 main_sender,
94 timer_sender,
95 background_sender,
96 _background_threads: background_threads,
97 main_thread_id: thread::current().id(),
98 }
99 }
100}
101
102impl PlatformDispatcher for LinuxDispatcher {
103 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
104 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
105 ThreadTaskTimings::convert(&global_timings)
106 }
107
108 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
109 THREAD_TIMINGS.with(|timings| {
110 let timings = timings.lock();
111 let timings = &timings.timings;
112
113 let mut vec = Vec::with_capacity(timings.len());
114
115 let (s1, s2) = timings.as_slices();
116 vec.extend_from_slice(s1);
117 vec.extend_from_slice(s2);
118 vec
119 })
120 }
121
122 fn is_main_thread(&self) -> bool {
123 thread::current().id() == self.main_thread_id
124 }
125
126 fn dispatch(&self, runnable: GpuiRunnable, _: Option<TaskLabel>) {
127 self.background_sender
128 .send(runnable.priority(), runnable)
129 .unwrap_or_else(|_| panic!("blocking sender returned without value"));
130 }
131
132 fn dispatch_on_main_thread(&self, runnable: GpuiRunnable) {
133 self.main_sender
134 .send(runnable.priority(), runnable)
135 .unwrap_or_else(|runnable| {
136 // NOTE: Runnable may wrap a Future that is !Send.
137 //
138 // This is usually safe because we only poll it on the main thread.
139 // However if the send fails, we know that:
140 // 1. main_receiver has been dropped (which implies the app is shutting down)
141 // 2. we are on a background thread.
142 // It is not safe to drop something !Send on the wrong thread, and
143 // the app will exit soon anyway, so we must forget the runnable.
144 std::mem::forget(runnable);
145 });
146 }
147
148 fn dispatch_after(&self, duration: Duration, runnable: GpuiRunnable) {
149 self.timer_sender
150 .send(TimerAfter { duration, runnable })
151 .ok();
152 }
153
154 fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
155 std::thread::spawn(move || {
156 // SAFETY: always safe to call
157 let thread_id = unsafe { libc::pthread_self() };
158
159 let policy = match priority {
160 RealtimePriority::Audio => libc::SCHED_FIFO,
161 RealtimePriority::Other => libc::SCHED_RR,
162 };
163 let sched_priority = match priority {
164 RealtimePriority::Audio => 65,
165 RealtimePriority::Other => 45,
166 };
167
168 // SAFETY: all sched_param members are valid when initialized to zero.
169 let mut sched_param =
170 unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
171 sched_param.sched_priority = sched_priority;
172 // SAFETY: sched_param is a valid initialized structure
173 let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
174 if result != 0 {
175 log::warn!("failed to set realtime thread priority to {:?}", priority);
176 }
177
178 f();
179 });
180 }
181}
182
183pub struct PriorityQueueCalloopSender<T> {
184 sender: PriorityQueueSender<T>,
185 ping: calloop::ping::Ping,
186}
187
188impl<T> PriorityQueueCalloopSender<T> {
189 fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
190 Self { sender: tx, ping }
191 }
192
193 fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
194 let res = self.sender.send(priority, item);
195 if res.is_ok() {
196 self.ping.ping();
197 }
198 res
199 }
200}
201
202impl<T> Drop for PriorityQueueCalloopSender<T> {
203 fn drop(&mut self) {
204 self.ping.ping();
205 }
206}
207
208pub struct PriorityQueueCalloopReceiver<T> {
209 receiver: PriorityQueueReceiver<T>,
210 source: calloop::ping::PingSource,
211 ping: calloop::ping::Ping,
212}
213
214impl<T> PriorityQueueCalloopReceiver<T> {
215 pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
216 let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
217
218 let (tx, rx) = PriorityQueueReceiver::new();
219
220 (
221 PriorityQueueCalloopSender::new(tx, ping.clone()),
222 Self {
223 receiver: rx,
224 source,
225 ping,
226 },
227 )
228 }
229}
230
231use calloop::channel::Event;
232
233#[derive(Debug)]
234pub struct ChannelError(calloop::ping::PingError);
235
236impl std::fmt::Display for ChannelError {
237 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 std::fmt::Display::fmt(&self.0, f)
240 }
241}
242
243impl std::error::Error for ChannelError {
244 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
245 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
246 Some(&self.0)
247 }
248}
249
250impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
251 type Event = Event<T>;
252 type Metadata = ();
253 type Ret = ();
254 type Error = ChannelError;
255
256 fn process_events<F>(
257 &mut self,
258 readiness: calloop::Readiness,
259 token: calloop::Token,
260 mut callback: F,
261 ) -> Result<calloop::PostAction, Self::Error>
262 where
263 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
264 {
265 let mut clear_readiness = false;
266 let mut disconnected = false;
267
268 let action = self
269 .source
270 .process_events(readiness, token, |(), &mut ()| {
271 let mut is_empty = true;
272
273 let mut receiver = self.receiver.clone();
274 for runnable in receiver.try_iter() {
275 match runnable {
276 Ok(r) => {
277 callback(Event::Msg(r), &mut ());
278 is_empty = false;
279 }
280 Err(_) => {
281 disconnected = true;
282 }
283 }
284 }
285
286 if disconnected {
287 callback(Event::Closed, &mut ());
288 }
289
290 if is_empty {
291 clear_readiness = true;
292 }
293 })
294 .map_err(ChannelError)?;
295
296 if disconnected {
297 Ok(PostAction::Remove)
298 } else if clear_readiness {
299 Ok(action)
300 } else {
301 // Re-notify the ping source so we can try again.
302 self.ping.ping();
303 Ok(PostAction::Continue)
304 }
305 }
306
307 fn register(
308 &mut self,
309 poll: &mut calloop::Poll,
310 token_factory: &mut calloop::TokenFactory,
311 ) -> calloop::Result<()> {
312 self.source.register(poll, token_factory)
313 }
314
315 fn reregister(
316 &mut self,
317 poll: &mut calloop::Poll,
318 token_factory: &mut calloop::TokenFactory,
319 ) -> calloop::Result<()> {
320 self.source.reregister(poll, token_factory)
321 }
322
323 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
324 self.source.unregister(poll)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn calloop_works() {
334 let mut event_loop = calloop::EventLoop::try_new().unwrap();
335 let handle = event_loop.handle();
336
337 let (tx, rx) = PriorityQueueCalloopReceiver::new();
338
339 struct Data {
340 got_msg: bool,
341 got_closed: bool,
342 }
343
344 let mut data = Data {
345 got_msg: false,
346 got_closed: false,
347 };
348
349 let _channel_token = handle
350 .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
351 Event::Msg(()) => {
352 data.got_msg = true;
353 }
354
355 Event::Closed => {
356 data.got_closed = true;
357 }
358 })
359 .unwrap();
360
361 // nothing is sent, nothing is received
362 event_loop
363 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
364 .unwrap();
365
366 assert!(!data.got_msg);
367 assert!(!data.got_closed);
368 // a message is send
369
370 tx.send(Priority::Medium, ()).unwrap();
371 event_loop
372 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
373 .unwrap();
374
375 assert!(data.got_msg);
376 assert!(!data.got_closed);
377
378 // the sender is dropped
379 drop(tx);
380 event_loop
381 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
382 .unwrap();
383
384 assert!(data.got_msg);
385 assert!(data.got_closed);
386 }
387}
388
389// running 1 test
390// test platform::linux::dispatcher::tests::tomato ... FAILED
391
392// failures:
393
394// ---- platform::linux::dispatcher::tests::tomato stdout ----
395// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
396// returning 1 tasks to process
397// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
398// (),
399// )
400// returning 0 tasks to process
401
402// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
403// assertion failed: data.got_closed
404// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace