1use crate::{
2 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
3 PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming,
4 ThreadTaskTimings, profiler,
5};
6use calloop::{
7 EventLoop, PostAction,
8 channel::{self, Sender},
9 timer::TimeoutAction,
10};
11use std::{
12 thread,
13 time::{Duration, Instant},
14};
15use util::ResultExt;
16
17struct TimerAfter {
18 duration: Duration,
19 runnable: RunnableVariant,
20}
21
22pub(crate) struct LinuxDispatcher {
23 main_sender: PriorityQueueCalloopSender<RunnableVariant>,
24 timer_sender: Sender<TimerAfter>,
25 background_sender: PriorityQueueSender<RunnableVariant>,
26 _background_threads: Vec<thread::JoinHandle<()>>,
27 main_thread_id: thread::ThreadId,
28}
29
30const MIN_THREADS: usize = 2;
31
32impl LinuxDispatcher {
33 pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
34 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
35 let thread_count =
36 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
37
38 // These thread should really be lower prio then the foreground
39 // executor
40 let mut background_threads = (0..thread_count)
41 .map(|i| {
42 let mut receiver = background_receiver.clone();
43 std::thread::Builder::new()
44 .name(format!("Worker-{i}"))
45 .spawn(move || {
46 for runnable in receiver.iter() {
47 let start = Instant::now();
48
49 let mut location = match runnable {
50 RunnableVariant::Meta(runnable) => {
51 let location = runnable.metadata().location;
52 let timing = TaskTiming {
53 location,
54 start,
55 end: None,
56 };
57 profiler::add_task_timing(timing);
58
59 runnable.run();
60 timing
61 }
62 RunnableVariant::Compat(runnable) => {
63 let location = core::panic::Location::caller();
64 let timing = TaskTiming {
65 location,
66 start,
67 end: None,
68 };
69 profiler::add_task_timing(timing);
70
71 runnable.run();
72 timing
73 }
74 };
75
76 let end = Instant::now();
77 location.end = Some(end);
78 profiler::add_task_timing(location);
79
80 log::trace!(
81 "background thread {}: ran runnable. took: {:?}",
82 i,
83 start.elapsed()
84 );
85 }
86 })
87 .unwrap()
88 })
89 .collect::<Vec<_>>();
90
91 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
92 let timer_thread = std::thread::Builder::new()
93 .name("Timer".to_owned())
94 .spawn(|| {
95 let mut event_loop: EventLoop<()> =
96 EventLoop::try_new().expect("Failed to initialize timer loop!");
97
98 let handle = event_loop.handle();
99 let timer_handle = event_loop.handle();
100 handle
101 .insert_source(timer_channel, move |e, _, _| {
102 if let channel::Event::Msg(timer) = e {
103 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
104 let mut runnable = Some(timer.runnable);
105 timer_handle
106 .insert_source(
107 calloop::timer::Timer::from_duration(timer.duration),
108 move |_, _, _| {
109 if let Some(runnable) = runnable.take() {
110 let start = Instant::now();
111 let mut timing = match runnable {
112 RunnableVariant::Meta(runnable) => {
113 let location = runnable.metadata().location;
114 let timing = TaskTiming {
115 location,
116 start,
117 end: None,
118 };
119 profiler::add_task_timing(timing);
120
121 runnable.run();
122 timing
123 }
124 RunnableVariant::Compat(runnable) => {
125 let timing = TaskTiming {
126 location: core::panic::Location::caller(),
127 start,
128 end: None,
129 };
130 profiler::add_task_timing(timing);
131
132 runnable.run();
133 timing
134 }
135 };
136 let end = Instant::now();
137
138 timing.end = Some(end);
139 profiler::add_task_timing(timing);
140 }
141 TimeoutAction::Drop
142 },
143 )
144 .expect("Failed to start timer");
145 }
146 })
147 .expect("Failed to start timer thread");
148
149 event_loop.run(None, &mut (), |_| {}).log_err();
150 })
151 .unwrap();
152
153 background_threads.push(timer_thread);
154
155 Self {
156 main_sender,
157 timer_sender,
158 background_sender,
159 _background_threads: background_threads,
160 main_thread_id: thread::current().id(),
161 }
162 }
163}
164
165impl PlatformDispatcher for LinuxDispatcher {
166 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
167 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
168 ThreadTaskTimings::convert(&global_timings)
169 }
170
171 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
172 THREAD_TIMINGS.with(|timings| {
173 let timings = timings.lock();
174 let timings = &timings.timings;
175
176 let mut vec = Vec::with_capacity(timings.len());
177
178 let (s1, s2) = timings.as_slices();
179 vec.extend_from_slice(s1);
180 vec.extend_from_slice(s2);
181 vec
182 })
183 }
184
185 fn is_main_thread(&self) -> bool {
186 thread::current().id() == self.main_thread_id
187 }
188
189 fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
190 self.background_sender
191 .send(priority, runnable)
192 .unwrap_or_else(|_| panic!("blocking sender returned without value"));
193 }
194
195 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
196 self.main_sender
197 .send(priority, runnable)
198 .unwrap_or_else(|runnable| {
199 // NOTE: Runnable may wrap a Future that is !Send.
200 //
201 // This is usually safe because we only poll it on the main thread.
202 // However if the send fails, we know that:
203 // 1. main_receiver has been dropped (which implies the app is shutting down)
204 // 2. we are on a background thread.
205 // It is not safe to drop something !Send on the wrong thread, and
206 // the app will exit soon anyway, so we must forget the runnable.
207 std::mem::forget(runnable);
208 });
209 }
210
211 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
212 self.timer_sender
213 .send(TimerAfter { duration, runnable })
214 .ok();
215 }
216
217 fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
218 std::thread::spawn(move || {
219 // SAFETY: always safe to call
220 let thread_id = unsafe { libc::pthread_self() };
221
222 let policy = match priority {
223 RealtimePriority::Audio => libc::SCHED_FIFO,
224 RealtimePriority::Other => libc::SCHED_RR,
225 };
226 let sched_priority = match priority {
227 RealtimePriority::Audio => 65,
228 RealtimePriority::Other => 45,
229 };
230
231 let sched_param = libc::sched_param { sched_priority };
232 // SAFETY: sched_param is a valid initialized structure
233 let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
234 if result != 0 {
235 log::warn!("failed to set realtime thread priority to {:?}", priority);
236 }
237
238 f();
239 });
240 }
241}
242
243pub struct PriorityQueueCalloopSender<T> {
244 sender: PriorityQueueSender<T>,
245 ping: calloop::ping::Ping,
246}
247
248impl<T> PriorityQueueCalloopSender<T> {
249 fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
250 Self { sender: tx, ping }
251 }
252
253 fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
254 let res = self.sender.send(priority, item);
255 if res.is_ok() {
256 self.ping.ping();
257 }
258 res
259 }
260}
261
262impl<T> Drop for PriorityQueueCalloopSender<T> {
263 fn drop(&mut self) {
264 self.ping.ping();
265 }
266}
267
268pub struct PriorityQueueCalloopReceiver<T> {
269 receiver: PriorityQueueReceiver<T>,
270 source: calloop::ping::PingSource,
271 ping: calloop::ping::Ping,
272}
273
274impl<T> PriorityQueueCalloopReceiver<T> {
275 pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
276 let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
277
278 let (tx, rx) = PriorityQueueReceiver::new();
279
280 (
281 PriorityQueueCalloopSender::new(tx, ping.clone()),
282 Self {
283 receiver: rx,
284 source,
285 ping,
286 },
287 )
288 }
289}
290
291use calloop::channel::Event;
292
293#[derive(Debug)]
294pub struct ChannelError(calloop::ping::PingError);
295
296impl std::fmt::Display for ChannelError {
297 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 std::fmt::Display::fmt(&self.0, f)
300 }
301}
302
303impl std::error::Error for ChannelError {
304 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
305 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
306 Some(&self.0)
307 }
308}
309
310impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
311 type Event = Event<T>;
312 type Metadata = ();
313 type Ret = ();
314 type Error = ChannelError;
315
316 fn process_events<F>(
317 &mut self,
318 readiness: calloop::Readiness,
319 token: calloop::Token,
320 mut callback: F,
321 ) -> Result<calloop::PostAction, Self::Error>
322 where
323 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
324 {
325 let mut clear_readiness = false;
326 let mut disconnected = false;
327
328 let action = self
329 .source
330 .process_events(readiness, token, |(), &mut ()| {
331 let mut is_empty = true;
332
333 let mut receiver = self.receiver.clone();
334 for runnable in receiver.try_iter() {
335 match runnable {
336 Ok(r) => {
337 callback(Event::Msg(r), &mut ());
338 is_empty = false;
339 }
340 Err(_) => {
341 disconnected = true;
342 }
343 }
344 }
345
346 if disconnected {
347 callback(Event::Closed, &mut ());
348 }
349
350 if is_empty {
351 clear_readiness = true;
352 }
353 })
354 .map_err(ChannelError)?;
355
356 if disconnected {
357 Ok(PostAction::Remove)
358 } else if clear_readiness {
359 Ok(action)
360 } else {
361 // Re-notify the ping source so we can try again.
362 self.ping.ping();
363 Ok(PostAction::Continue)
364 }
365 }
366
367 fn register(
368 &mut self,
369 poll: &mut calloop::Poll,
370 token_factory: &mut calloop::TokenFactory,
371 ) -> calloop::Result<()> {
372 self.source.register(poll, token_factory)
373 }
374
375 fn reregister(
376 &mut self,
377 poll: &mut calloop::Poll,
378 token_factory: &mut calloop::TokenFactory,
379 ) -> calloop::Result<()> {
380 self.source.reregister(poll, token_factory)
381 }
382
383 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
384 self.source.unregister(poll)
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn calloop_works() {
394 let mut event_loop = calloop::EventLoop::try_new().unwrap();
395 let handle = event_loop.handle();
396
397 let (tx, rx) = PriorityQueueCalloopReceiver::new();
398
399 struct Data {
400 got_msg: bool,
401 got_closed: bool,
402 }
403
404 let mut data = Data {
405 got_msg: false,
406 got_closed: false,
407 };
408
409 let _channel_token = handle
410 .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
411 Event::Msg(()) => {
412 data.got_msg = true;
413 }
414
415 Event::Closed => {
416 data.got_closed = true;
417 }
418 })
419 .unwrap();
420
421 // nothing is sent, nothing is received
422 event_loop
423 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
424 .unwrap();
425
426 assert!(!data.got_msg);
427 assert!(!data.got_closed);
428 // a message is send
429
430 tx.send(Priority::Medium, ()).unwrap();
431 event_loop
432 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
433 .unwrap();
434
435 assert!(data.got_msg);
436 assert!(!data.got_closed);
437
438 // the sender is dropped
439 drop(tx);
440 event_loop
441 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
442 .unwrap();
443
444 assert!(data.got_msg);
445 assert!(data.got_closed);
446 }
447}
448
449// running 1 test
450// test platform::linux::dispatcher::tests::tomato ... FAILED
451
452// failures:
453
454// ---- platform::linux::dispatcher::tests::tomato stdout ----
455// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
456// returning 1 tasks to process
457// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
458// (),
459// )
460// returning 0 tasks to process
461
462// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
463// assertion failed: data.got_closed
464// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace