1use calloop::{
2 EventLoop, PostAction,
3 channel::{self, Sender},
4 timer::TimeoutAction,
5};
6use util::ResultExt;
7
8use std::{
9 mem::MaybeUninit,
10 thread,
11 time::{Duration, Instant},
12};
13
14use crate::{
15 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
16 PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming,
17 ThreadTaskTimings, profiler,
18};
19
20struct TimerAfter {
21 duration: Duration,
22 runnable: RunnableVariant,
23}
24
25pub(crate) struct LinuxDispatcher {
26 main_sender: PriorityQueueCalloopSender<RunnableVariant>,
27 timer_sender: Sender<TimerAfter>,
28 background_sender: PriorityQueueSender<RunnableVariant>,
29 _background_threads: Vec<thread::JoinHandle<()>>,
30 main_thread_id: thread::ThreadId,
31}
32
33const MIN_THREADS: usize = 2;
34
35impl LinuxDispatcher {
36 pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
37 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
38 let thread_count =
39 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
40
41 // These thread should really be lower prio then the foreground
42 // executor
43 let mut background_threads = (0..thread_count)
44 .map(|i| {
45 let mut receiver = background_receiver.clone();
46 std::thread::Builder::new()
47 .name(format!("Worker-{i}"))
48 .spawn(move || {
49 for runnable in receiver.iter() {
50 let start = Instant::now();
51
52 let mut location = match runnable {
53 RunnableVariant::Meta(runnable) => {
54 let location = runnable.metadata().location;
55 let timing = TaskTiming {
56 location,
57 start,
58 end: None,
59 };
60 profiler::add_task_timing(timing);
61
62 runnable.run();
63 timing
64 }
65 RunnableVariant::Compat(runnable) => {
66 let location = core::panic::Location::caller();
67 let timing = TaskTiming {
68 location,
69 start,
70 end: None,
71 };
72 profiler::add_task_timing(timing);
73
74 runnable.run();
75 timing
76 }
77 };
78
79 let end = Instant::now();
80 location.end = Some(end);
81 profiler::add_task_timing(location);
82
83 log::trace!(
84 "background thread {}: ran runnable. took: {:?}",
85 i,
86 start.elapsed()
87 );
88 }
89 })
90 .unwrap()
91 })
92 .collect::<Vec<_>>();
93
94 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
95 let timer_thread = std::thread::Builder::new()
96 .name("Timer".to_owned())
97 .spawn(|| {
98 let mut event_loop: EventLoop<()> =
99 EventLoop::try_new().expect("Failed to initialize timer loop!");
100
101 let handle = event_loop.handle();
102 let timer_handle = event_loop.handle();
103 handle
104 .insert_source(timer_channel, move |e, _, _| {
105 if let channel::Event::Msg(timer) = e {
106 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
107 let mut runnable = Some(timer.runnable);
108 timer_handle
109 .insert_source(
110 calloop::timer::Timer::from_duration(timer.duration),
111 move |_, _, _| {
112 if let Some(runnable) = runnable.take() {
113 let start = Instant::now();
114 let mut timing = match runnable {
115 RunnableVariant::Meta(runnable) => {
116 let location = runnable.metadata().location;
117 let timing = TaskTiming {
118 location,
119 start,
120 end: None,
121 };
122 profiler::add_task_timing(timing);
123
124 runnable.run();
125 timing
126 }
127 RunnableVariant::Compat(runnable) => {
128 let timing = TaskTiming {
129 location: core::panic::Location::caller(),
130 start,
131 end: None,
132 };
133 profiler::add_task_timing(timing);
134
135 runnable.run();
136 timing
137 }
138 };
139 let end = Instant::now();
140
141 timing.end = Some(end);
142 profiler::add_task_timing(timing);
143 }
144 TimeoutAction::Drop
145 },
146 )
147 .expect("Failed to start timer");
148 }
149 })
150 .expect("Failed to start timer thread");
151
152 event_loop.run(None, &mut (), |_| {}).log_err();
153 })
154 .unwrap();
155
156 background_threads.push(timer_thread);
157
158 Self {
159 main_sender,
160 timer_sender,
161 background_sender,
162 _background_threads: background_threads,
163 main_thread_id: thread::current().id(),
164 }
165 }
166}
167
168impl PlatformDispatcher for LinuxDispatcher {
169 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
170 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
171 ThreadTaskTimings::convert(&global_timings)
172 }
173
174 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
175 THREAD_TIMINGS.with(|timings| {
176 let timings = timings.lock();
177 let timings = &timings.timings;
178
179 let mut vec = Vec::with_capacity(timings.len());
180
181 let (s1, s2) = timings.as_slices();
182 vec.extend_from_slice(s1);
183 vec.extend_from_slice(s2);
184 vec
185 })
186 }
187
188 fn is_main_thread(&self) -> bool {
189 thread::current().id() == self.main_thread_id
190 }
191
192 fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
193 self.background_sender
194 .send(priority, runnable)
195 .unwrap_or_else(|_| panic!("blocking sender returned without value"));
196 }
197
198 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
199 self.main_sender
200 .send(priority, runnable)
201 .unwrap_or_else(|runnable| {
202 // NOTE: Runnable may wrap a Future that is !Send.
203 //
204 // This is usually safe because we only poll it on the main thread.
205 // However if the send fails, we know that:
206 // 1. main_receiver has been dropped (which implies the app is shutting down)
207 // 2. we are on a background thread.
208 // It is not safe to drop something !Send on the wrong thread, and
209 // the app will exit soon anyway, so we must forget the runnable.
210 std::mem::forget(runnable);
211 });
212 }
213
214 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
215 self.timer_sender
216 .send(TimerAfter { duration, runnable })
217 .ok();
218 }
219
220 fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
221 std::thread::spawn(move || {
222 // SAFETY: always safe to call
223 let thread_id = unsafe { libc::pthread_self() };
224
225 let policy = match priority {
226 RealtimePriority::Audio => libc::SCHED_FIFO,
227 RealtimePriority::Other => libc::SCHED_RR,
228 };
229 let sched_priority = match priority {
230 RealtimePriority::Audio => 65,
231 RealtimePriority::Other => 45,
232 };
233
234 // SAFETY: all sched_param members are valid when initialized to zero.
235 let mut sched_param =
236 unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
237 sched_param.sched_priority = sched_priority;
238 // SAFETY: sched_param is a valid initialized structure
239 let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
240 if result != 0 {
241 log::warn!("failed to set realtime thread priority to {:?}", priority);
242 }
243
244 f();
245 });
246 }
247}
248
249pub struct PriorityQueueCalloopSender<T> {
250 sender: PriorityQueueSender<T>,
251 ping: calloop::ping::Ping,
252}
253
254impl<T> PriorityQueueCalloopSender<T> {
255 fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
256 Self { sender: tx, ping }
257 }
258
259 fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
260 let res = self.sender.send(priority, item);
261 if res.is_ok() {
262 self.ping.ping();
263 }
264 res
265 }
266}
267
268impl<T> Drop for PriorityQueueCalloopSender<T> {
269 fn drop(&mut self) {
270 self.ping.ping();
271 }
272}
273
274pub struct PriorityQueueCalloopReceiver<T> {
275 receiver: PriorityQueueReceiver<T>,
276 source: calloop::ping::PingSource,
277 ping: calloop::ping::Ping,
278}
279
280impl<T> PriorityQueueCalloopReceiver<T> {
281 pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
282 let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
283
284 let (tx, rx) = PriorityQueueReceiver::new();
285
286 (
287 PriorityQueueCalloopSender::new(tx, ping.clone()),
288 Self {
289 receiver: rx,
290 source,
291 ping,
292 },
293 )
294 }
295}
296
297use calloop::channel::Event;
298
299#[derive(Debug)]
300pub struct ChannelError(calloop::ping::PingError);
301
302impl std::fmt::Display for ChannelError {
303 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 std::fmt::Display::fmt(&self.0, f)
306 }
307}
308
309impl std::error::Error for ChannelError {
310 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
311 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
312 Some(&self.0)
313 }
314}
315
316impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
317 type Event = Event<T>;
318 type Metadata = ();
319 type Ret = ();
320 type Error = ChannelError;
321
322 fn process_events<F>(
323 &mut self,
324 readiness: calloop::Readiness,
325 token: calloop::Token,
326 mut callback: F,
327 ) -> Result<calloop::PostAction, Self::Error>
328 where
329 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
330 {
331 let mut clear_readiness = false;
332 let mut disconnected = false;
333
334 let action = self
335 .source
336 .process_events(readiness, token, |(), &mut ()| {
337 let mut is_empty = true;
338
339 let mut receiver = self.receiver.clone();
340 for runnable in receiver.try_iter() {
341 match runnable {
342 Ok(r) => {
343 callback(Event::Msg(r), &mut ());
344 is_empty = false;
345 }
346 Err(_) => {
347 disconnected = true;
348 }
349 }
350 }
351
352 if disconnected {
353 callback(Event::Closed, &mut ());
354 }
355
356 if is_empty {
357 clear_readiness = true;
358 }
359 })
360 .map_err(ChannelError)?;
361
362 if disconnected {
363 Ok(PostAction::Remove)
364 } else if clear_readiness {
365 Ok(action)
366 } else {
367 // Re-notify the ping source so we can try again.
368 self.ping.ping();
369 Ok(PostAction::Continue)
370 }
371 }
372
373 fn register(
374 &mut self,
375 poll: &mut calloop::Poll,
376 token_factory: &mut calloop::TokenFactory,
377 ) -> calloop::Result<()> {
378 self.source.register(poll, token_factory)
379 }
380
381 fn reregister(
382 &mut self,
383 poll: &mut calloop::Poll,
384 token_factory: &mut calloop::TokenFactory,
385 ) -> calloop::Result<()> {
386 self.source.reregister(poll, token_factory)
387 }
388
389 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
390 self.source.unregister(poll)
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn calloop_works() {
400 let mut event_loop = calloop::EventLoop::try_new().unwrap();
401 let handle = event_loop.handle();
402
403 let (tx, rx) = PriorityQueueCalloopReceiver::new();
404
405 struct Data {
406 got_msg: bool,
407 got_closed: bool,
408 }
409
410 let mut data = Data {
411 got_msg: false,
412 got_closed: false,
413 };
414
415 let _channel_token = handle
416 .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
417 Event::Msg(()) => {
418 data.got_msg = true;
419 }
420
421 Event::Closed => {
422 data.got_closed = true;
423 }
424 })
425 .unwrap();
426
427 // nothing is sent, nothing is received
428 event_loop
429 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
430 .unwrap();
431
432 assert!(!data.got_msg);
433 assert!(!data.got_closed);
434 // a message is send
435
436 tx.send(Priority::Medium, ()).unwrap();
437 event_loop
438 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
439 .unwrap();
440
441 assert!(data.got_msg);
442 assert!(!data.got_closed);
443
444 // the sender is dropped
445 drop(tx);
446 event_loop
447 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
448 .unwrap();
449
450 assert!(data.got_msg);
451 assert!(data.got_closed);
452 }
453}
454
455// running 1 test
456// test platform::linux::dispatcher::tests::tomato ... FAILED
457
458// failures:
459
460// ---- platform::linux::dispatcher::tests::tomato stdout ----
461// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
462// returning 1 tasks to process
463// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
464// (),
465// )
466// returning 0 tasks to process
467
468// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
469// assertion failed: data.got_closed
470// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace