dispatcher.rs

  1use calloop::{
  2    EventLoop, PostAction,
  3    channel::{self, Sender},
  4    timer::TimeoutAction,
  5};
  6use util::ResultExt;
  7
  8use std::{mem::MaybeUninit, thread, time::Duration};
  9
 10use crate::{
 11    GLOBAL_THREAD_TIMINGS, GpuiRunnable, PlatformDispatcher, Priority, PriorityQueueReceiver,
 12    PriorityQueueSender, RealtimePriority, THREAD_TIMINGS, TaskLabel, ThreadTaskTimings,
 13};
 14
 15struct TimerAfter {
 16    duration: Duration,
 17    runnable: GpuiRunnable,
 18}
 19
 20pub(crate) struct LinuxDispatcher {
 21    main_sender: PriorityQueueCalloopSender<GpuiRunnable>,
 22    timer_sender: Sender<TimerAfter>,
 23    background_sender: PriorityQueueSender<GpuiRunnable>,
 24    _background_threads: Vec<thread::JoinHandle<()>>,
 25    main_thread_id: thread::ThreadId,
 26}
 27
 28const MIN_THREADS: usize = 2;
 29
 30impl LinuxDispatcher {
 31    pub fn new(main_sender: PriorityQueueCalloopSender<GpuiRunnable>) -> Self {
 32        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 33        let thread_count =
 34            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 35
 36        // These thread should really be lower prio then the foreground
 37        // executor
 38        let mut background_threads = (0..thread_count)
 39            .map(|i| {
 40                let mut receiver: PriorityQueueReceiver<GpuiRunnable> = background_receiver.clone();
 41                std::thread::Builder::new()
 42                    .name(format!("Worker-{i}"))
 43                    .spawn(move || {
 44                        for runnable in receiver.iter() {
 45                            let started = runnable.run_and_profile();
 46                            log::trace!(
 47                                "background thread {}: ran runnable. took: {:?}",
 48                                i,
 49                                started.elapsed()
 50                            );
 51                        }
 52                    })
 53                    .unwrap()
 54            })
 55            .collect::<Vec<_>>();
 56
 57        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 58        let timer_thread = std::thread::Builder::new()
 59            .name("Timer".to_owned())
 60            .spawn(|| {
 61                let mut event_loop: EventLoop<()> =
 62                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 63
 64                let handle = event_loop.handle();
 65                let timer_handle = event_loop.handle();
 66                handle
 67                    .insert_source(timer_channel, move |e, _, _| {
 68                        if let channel::Event::Msg(timer) = e {
 69                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 70                            let mut runnable = Some(timer.runnable);
 71                            timer_handle
 72                                .insert_source(
 73                                    calloop::timer::Timer::from_duration(timer.duration),
 74                                    move |_, _, _| {
 75                                        if let Some(runnable) = runnable.take() {
 76                                            runnable.run_and_profile();
 77                                        }
 78                                        TimeoutAction::Drop
 79                                    },
 80                                )
 81                                .expect("Failed to start timer");
 82                        }
 83                    })
 84                    .expect("Failed to start timer thread");
 85
 86                event_loop.run(None, &mut (), |_| {}).log_err();
 87            })
 88            .unwrap();
 89
 90        background_threads.push(timer_thread);
 91
 92        Self {
 93            main_sender,
 94            timer_sender,
 95            background_sender,
 96            _background_threads: background_threads,
 97            main_thread_id: thread::current().id(),
 98        }
 99    }
100}
101
102impl PlatformDispatcher for LinuxDispatcher {
103    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
104        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
105        ThreadTaskTimings::convert(&global_timings)
106    }
107
108    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
109        THREAD_TIMINGS.with(|timings| {
110            let timings = timings.lock();
111            let timings = &timings.timings;
112
113            let mut vec = Vec::with_capacity(timings.len());
114
115            let (s1, s2) = timings.as_slices();
116            vec.extend_from_slice(s1);
117            vec.extend_from_slice(s2);
118            vec
119        })
120    }
121
122    fn is_main_thread(&self) -> bool {
123        thread::current().id() == self.main_thread_id
124    }
125
126    fn dispatch(&self, runnable: GpuiRunnable, _: Option<TaskLabel>) {
127        self.background_sender
128            .send(runnable.priority(), runnable)
129            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
130    }
131
132    fn dispatch_on_main_thread(&self, runnable: GpuiRunnable) {
133        self.main_sender
134            .send(runnable.priority(), runnable)
135            .unwrap_or_else(|runnable| {
136                // NOTE: Runnable may wrap a Future that is !Send.
137                //
138                // This is usually safe because we only poll it on the main thread.
139                // However if the send fails, we know that:
140                // 1. main_receiver has been dropped (which implies the app is shutting down)
141                // 2. we are on a background thread.
142                // It is not safe to drop something !Send on the wrong thread, and
143                // the app will exit soon anyway, so we must forget the runnable.
144                std::mem::forget(runnable);
145            });
146    }
147
148    fn dispatch_after(&self, duration: Duration, runnable: GpuiRunnable) {
149        self.timer_sender
150            .send(TimerAfter { duration, runnable })
151            .ok();
152    }
153
154    fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
155        std::thread::spawn(move || {
156            // SAFETY: always safe to call
157            let thread_id = unsafe { libc::pthread_self() };
158
159            let policy = match priority {
160                RealtimePriority::Audio => libc::SCHED_FIFO,
161                RealtimePriority::Other => libc::SCHED_RR,
162            };
163            let sched_priority = match priority {
164                RealtimePriority::Audio => 65,
165                RealtimePriority::Other => 45,
166            };
167
168            // SAFETY: all sched_param members are valid when initialized to zero.
169            let mut sched_param =
170                unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
171            sched_param.sched_priority = sched_priority;
172            // SAFETY: sched_param is a valid initialized structure
173            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
174            if result != 0 {
175                log::warn!("failed to set realtime thread priority to {:?}", priority);
176            }
177
178            f();
179        });
180    }
181}
182
183pub struct PriorityQueueCalloopSender<T> {
184    sender: PriorityQueueSender<T>,
185    ping: calloop::ping::Ping,
186}
187
188impl<T> PriorityQueueCalloopSender<T> {
189    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
190        Self { sender: tx, ping }
191    }
192
193    fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
194        let res = self.sender.send(priority, item);
195        if res.is_ok() {
196            self.ping.ping();
197        }
198        res
199    }
200}
201
202impl<T> Drop for PriorityQueueCalloopSender<T> {
203    fn drop(&mut self) {
204        self.ping.ping();
205    }
206}
207
208pub struct PriorityQueueCalloopReceiver<T> {
209    receiver: PriorityQueueReceiver<T>,
210    source: calloop::ping::PingSource,
211    ping: calloop::ping::Ping,
212}
213
214impl<T> PriorityQueueCalloopReceiver<T> {
215    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
216        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
217
218        let (tx, rx) = PriorityQueueReceiver::new();
219
220        (
221            PriorityQueueCalloopSender::new(tx, ping.clone()),
222            Self {
223                receiver: rx,
224                source,
225                ping,
226            },
227        )
228    }
229}
230
231use calloop::channel::Event;
232
233#[derive(Debug)]
234pub struct ChannelError(calloop::ping::PingError);
235
236impl std::fmt::Display for ChannelError {
237    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        std::fmt::Display::fmt(&self.0, f)
240    }
241}
242
243impl std::error::Error for ChannelError {
244    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
245    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
246        Some(&self.0)
247    }
248}
249
250impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
251    type Event = Event<T>;
252    type Metadata = ();
253    type Ret = ();
254    type Error = ChannelError;
255
256    fn process_events<F>(
257        &mut self,
258        readiness: calloop::Readiness,
259        token: calloop::Token,
260        mut callback: F,
261    ) -> Result<calloop::PostAction, Self::Error>
262    where
263        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
264    {
265        let mut clear_readiness = false;
266        let mut disconnected = false;
267
268        let action = self
269            .source
270            .process_events(readiness, token, |(), &mut ()| {
271                let mut is_empty = true;
272
273                let mut receiver = self.receiver.clone();
274                for runnable in receiver.try_iter() {
275                    match runnable {
276                        Ok(r) => {
277                            callback(Event::Msg(r), &mut ());
278                            is_empty = false;
279                        }
280                        Err(_) => {
281                            disconnected = true;
282                        }
283                    }
284                }
285
286                if disconnected {
287                    callback(Event::Closed, &mut ());
288                }
289
290                if is_empty {
291                    clear_readiness = true;
292                }
293            })
294            .map_err(ChannelError)?;
295
296        if disconnected {
297            Ok(PostAction::Remove)
298        } else if clear_readiness {
299            Ok(action)
300        } else {
301            // Re-notify the ping source so we can try again.
302            self.ping.ping();
303            Ok(PostAction::Continue)
304        }
305    }
306
307    fn register(
308        &mut self,
309        poll: &mut calloop::Poll,
310        token_factory: &mut calloop::TokenFactory,
311    ) -> calloop::Result<()> {
312        self.source.register(poll, token_factory)
313    }
314
315    fn reregister(
316        &mut self,
317        poll: &mut calloop::Poll,
318        token_factory: &mut calloop::TokenFactory,
319    ) -> calloop::Result<()> {
320        self.source.reregister(poll, token_factory)
321    }
322
323    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
324        self.source.unregister(poll)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn calloop_works() {
334        let mut event_loop = calloop::EventLoop::try_new().unwrap();
335        let handle = event_loop.handle();
336
337        let (tx, rx) = PriorityQueueCalloopReceiver::new();
338
339        struct Data {
340            got_msg: bool,
341            got_closed: bool,
342        }
343
344        let mut data = Data {
345            got_msg: false,
346            got_closed: false,
347        };
348
349        let _channel_token = handle
350            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
351                Event::Msg(()) => {
352                    data.got_msg = true;
353                }
354
355                Event::Closed => {
356                    data.got_closed = true;
357                }
358            })
359            .unwrap();
360
361        // nothing is sent, nothing is received
362        event_loop
363            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
364            .unwrap();
365
366        assert!(!data.got_msg);
367        assert!(!data.got_closed);
368        // a message is send
369
370        tx.send(Priority::Medium, ()).unwrap();
371        event_loop
372            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
373            .unwrap();
374
375        assert!(data.got_msg);
376        assert!(!data.got_closed);
377
378        // the sender is dropped
379        drop(tx);
380        event_loop
381            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
382            .unwrap();
383
384        assert!(data.got_msg);
385        assert!(data.got_closed);
386    }
387}
388
389// running 1 test
390// test platform::linux::dispatcher::tests::tomato ... FAILED
391
392// failures:
393
394// ---- platform::linux::dispatcher::tests::tomato stdout ----
395// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
396// returning 1 tasks to process
397// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
398//     (),
399// )
400// returning 0 tasks to process
401
402// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
403// assertion failed: data.got_closed
404// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace