dispatcher.rs

  1use crate::{
  2    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
  3    PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming,
  4    ThreadTaskTimings, profiler,
  5};
  6use calloop::{
  7    EventLoop, PostAction,
  8    channel::{self, Sender},
  9    timer::TimeoutAction,
 10};
 11use std::{
 12    thread,
 13    time::{Duration, Instant},
 14};
 15use util::ResultExt;
 16
 17struct TimerAfter {
 18    duration: Duration,
 19    runnable: RunnableVariant,
 20}
 21
 22pub(crate) struct LinuxDispatcher {
 23    main_sender: PriorityQueueCalloopSender<RunnableVariant>,
 24    timer_sender: Sender<TimerAfter>,
 25    background_sender: PriorityQueueSender<RunnableVariant>,
 26    _background_threads: Vec<thread::JoinHandle<()>>,
 27    main_thread_id: thread::ThreadId,
 28}
 29
 30const MIN_THREADS: usize = 2;
 31
 32impl LinuxDispatcher {
 33    pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
 34        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 35        let thread_count =
 36            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 37
 38        // These thread should really be lower prio then the foreground
 39        // executor
 40        let mut background_threads = (0..thread_count)
 41            .map(|i| {
 42                let mut receiver = background_receiver.clone();
 43                std::thread::Builder::new()
 44                    .name(format!("Worker-{i}"))
 45                    .spawn(move || {
 46                        for runnable in receiver.iter() {
 47                            let start = Instant::now();
 48
 49                            let mut location = match runnable {
 50                                RunnableVariant::Meta(runnable) => {
 51                                    let location = runnable.metadata().location;
 52                                    let timing = TaskTiming {
 53                                        location,
 54                                        start,
 55                                        end: None,
 56                                    };
 57                                    profiler::add_task_timing(timing);
 58
 59                                    runnable.run();
 60                                    timing
 61                                }
 62                                RunnableVariant::Compat(runnable) => {
 63                                    let location = core::panic::Location::caller();
 64                                    let timing = TaskTiming {
 65                                        location,
 66                                        start,
 67                                        end: None,
 68                                    };
 69                                    profiler::add_task_timing(timing);
 70
 71                                    runnable.run();
 72                                    timing
 73                                }
 74                            };
 75
 76                            let end = Instant::now();
 77                            location.end = Some(end);
 78                            profiler::add_task_timing(location);
 79
 80                            log::trace!(
 81                                "background thread {}: ran runnable. took: {:?}",
 82                                i,
 83                                start.elapsed()
 84                            );
 85                        }
 86                    })
 87                    .unwrap()
 88            })
 89            .collect::<Vec<_>>();
 90
 91        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 92        let timer_thread = std::thread::Builder::new()
 93            .name("Timer".to_owned())
 94            .spawn(|| {
 95                let mut event_loop: EventLoop<()> =
 96                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 97
 98                let handle = event_loop.handle();
 99                let timer_handle = event_loop.handle();
100                handle
101                    .insert_source(timer_channel, move |e, _, _| {
102                        if let channel::Event::Msg(timer) = e {
103                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
104                            let mut runnable = Some(timer.runnable);
105                            timer_handle
106                                .insert_source(
107                                    calloop::timer::Timer::from_duration(timer.duration),
108                                    move |_, _, _| {
109                                        if let Some(runnable) = runnable.take() {
110                                            let start = Instant::now();
111                                            let mut timing = match runnable {
112                                                RunnableVariant::Meta(runnable) => {
113                                                    let location = runnable.metadata().location;
114                                                    let timing = TaskTiming {
115                                                        location,
116                                                        start,
117                                                        end: None,
118                                                    };
119                                                    profiler::add_task_timing(timing);
120
121                                                    runnable.run();
122                                                    timing
123                                                }
124                                                RunnableVariant::Compat(runnable) => {
125                                                    let timing = TaskTiming {
126                                                        location: core::panic::Location::caller(),
127                                                        start,
128                                                        end: None,
129                                                    };
130                                                    profiler::add_task_timing(timing);
131
132                                                    runnable.run();
133                                                    timing
134                                                }
135                                            };
136                                            let end = Instant::now();
137
138                                            timing.end = Some(end);
139                                            profiler::add_task_timing(timing);
140                                        }
141                                        TimeoutAction::Drop
142                                    },
143                                )
144                                .expect("Failed to start timer");
145                        }
146                    })
147                    .expect("Failed to start timer thread");
148
149                event_loop.run(None, &mut (), |_| {}).log_err();
150            })
151            .unwrap();
152
153        background_threads.push(timer_thread);
154
155        Self {
156            main_sender,
157            timer_sender,
158            background_sender,
159            _background_threads: background_threads,
160            main_thread_id: thread::current().id(),
161        }
162    }
163}
164
165impl PlatformDispatcher for LinuxDispatcher {
166    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
167        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
168        ThreadTaskTimings::convert(&global_timings)
169    }
170
171    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
172        THREAD_TIMINGS.with(|timings| {
173            let timings = timings.lock();
174            let timings = &timings.timings;
175
176            let mut vec = Vec::with_capacity(timings.len());
177
178            let (s1, s2) = timings.as_slices();
179            vec.extend_from_slice(s1);
180            vec.extend_from_slice(s2);
181            vec
182        })
183    }
184
185    fn is_main_thread(&self) -> bool {
186        thread::current().id() == self.main_thread_id
187    }
188
189    fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
190        self.background_sender
191            .send(priority, runnable)
192            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
193    }
194
195    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
196        self.main_sender
197            .send(priority, runnable)
198            .unwrap_or_else(|runnable| {
199                // NOTE: Runnable may wrap a Future that is !Send.
200                //
201                // This is usually safe because we only poll it on the main thread.
202                // However if the send fails, we know that:
203                // 1. main_receiver has been dropped (which implies the app is shutting down)
204                // 2. we are on a background thread.
205                // It is not safe to drop something !Send on the wrong thread, and
206                // the app will exit soon anyway, so we must forget the runnable.
207                std::mem::forget(runnable);
208            });
209    }
210
211    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
212        self.timer_sender
213            .send(TimerAfter { duration, runnable })
214            .ok();
215    }
216
217    fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
218        std::thread::spawn(move || {
219            // SAFETY: always safe to call
220            let thread_id = unsafe { libc::pthread_self() };
221
222            let policy = match priority {
223                RealtimePriority::Audio => libc::SCHED_FIFO,
224                RealtimePriority::Other => libc::SCHED_RR,
225            };
226            let sched_priority = match priority {
227                RealtimePriority::Audio => 65,
228                RealtimePriority::Other => 45,
229            };
230
231            let sched_param = libc::sched_param { sched_priority };
232            // SAFETY: sched_param is a valid initialized structure
233            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
234            if result != 0 {
235                log::warn!("failed to set realtime thread priority to {:?}", priority);
236            }
237
238            f();
239        });
240    }
241}
242
243pub struct PriorityQueueCalloopSender<T> {
244    sender: PriorityQueueSender<T>,
245    ping: calloop::ping::Ping,
246}
247
248impl<T> PriorityQueueCalloopSender<T> {
249    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
250        Self { sender: tx, ping }
251    }
252
253    fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
254        let res = self.sender.send(priority, item);
255        if res.is_ok() {
256            self.ping.ping();
257        }
258        res
259    }
260}
261
262impl<T> Drop for PriorityQueueCalloopSender<T> {
263    fn drop(&mut self) {
264        self.ping.ping();
265    }
266}
267
268pub struct PriorityQueueCalloopReceiver<T> {
269    receiver: PriorityQueueReceiver<T>,
270    source: calloop::ping::PingSource,
271    ping: calloop::ping::Ping,
272}
273
274impl<T> PriorityQueueCalloopReceiver<T> {
275    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
276        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
277
278        let (tx, rx) = PriorityQueueReceiver::new();
279
280        (
281            PriorityQueueCalloopSender::new(tx, ping.clone()),
282            Self {
283                receiver: rx,
284                source,
285                ping,
286            },
287        )
288    }
289}
290
291use calloop::channel::Event;
292
293#[derive(Debug)]
294pub struct ChannelError(calloop::ping::PingError);
295
296impl std::fmt::Display for ChannelError {
297    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        std::fmt::Display::fmt(&self.0, f)
300    }
301}
302
303impl std::error::Error for ChannelError {
304    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
305    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
306        Some(&self.0)
307    }
308}
309
310impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
311    type Event = Event<T>;
312    type Metadata = ();
313    type Ret = ();
314    type Error = ChannelError;
315
316    fn process_events<F>(
317        &mut self,
318        readiness: calloop::Readiness,
319        token: calloop::Token,
320        mut callback: F,
321    ) -> Result<calloop::PostAction, Self::Error>
322    where
323        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
324    {
325        let mut clear_readiness = false;
326        let mut disconnected = false;
327
328        let action = self
329            .source
330            .process_events(readiness, token, |(), &mut ()| {
331                let mut is_empty = true;
332
333                let mut receiver = self.receiver.clone();
334                for runnable in receiver.try_iter() {
335                    match runnable {
336                        Ok(r) => {
337                            callback(Event::Msg(r), &mut ());
338                            is_empty = false;
339                        }
340                        Err(_) => {
341                            disconnected = true;
342                        }
343                    }
344                }
345
346                if disconnected {
347                    callback(Event::Closed, &mut ());
348                }
349
350                if is_empty {
351                    clear_readiness = true;
352                }
353            })
354            .map_err(ChannelError)?;
355
356        if disconnected {
357            Ok(PostAction::Remove)
358        } else if clear_readiness {
359            Ok(action)
360        } else {
361            // Re-notify the ping source so we can try again.
362            self.ping.ping();
363            Ok(PostAction::Continue)
364        }
365    }
366
367    fn register(
368        &mut self,
369        poll: &mut calloop::Poll,
370        token_factory: &mut calloop::TokenFactory,
371    ) -> calloop::Result<()> {
372        self.source.register(poll, token_factory)
373    }
374
375    fn reregister(
376        &mut self,
377        poll: &mut calloop::Poll,
378        token_factory: &mut calloop::TokenFactory,
379    ) -> calloop::Result<()> {
380        self.source.reregister(poll, token_factory)
381    }
382
383    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
384        self.source.unregister(poll)
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn calloop_works() {
394        let mut event_loop = calloop::EventLoop::try_new().unwrap();
395        let handle = event_loop.handle();
396
397        let (tx, rx) = PriorityQueueCalloopReceiver::new();
398
399        struct Data {
400            got_msg: bool,
401            got_closed: bool,
402        }
403
404        let mut data = Data {
405            got_msg: false,
406            got_closed: false,
407        };
408
409        let _channel_token = handle
410            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
411                Event::Msg(()) => {
412                    data.got_msg = true;
413                }
414
415                Event::Closed => {
416                    data.got_closed = true;
417                }
418            })
419            .unwrap();
420
421        // nothing is sent, nothing is received
422        event_loop
423            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
424            .unwrap();
425
426        assert!(!data.got_msg);
427        assert!(!data.got_closed);
428        // a message is send
429
430        tx.send(Priority::Medium, ()).unwrap();
431        event_loop
432            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
433            .unwrap();
434
435        assert!(data.got_msg);
436        assert!(!data.got_closed);
437
438        // the sender is dropped
439        drop(tx);
440        event_loop
441            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
442            .unwrap();
443
444        assert!(data.got_msg);
445        assert!(data.got_closed);
446    }
447}
448
449// running 1 test
450// test platform::linux::dispatcher::tests::tomato ... FAILED
451
452// failures:
453
454// ---- platform::linux::dispatcher::tests::tomato stdout ----
455// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
456// returning 1 tasks to process
457// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
458//     (),
459// )
460// returning 0 tasks to process
461
462// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
463// assertion failed: data.got_closed
464// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace