dispatcher.rs

  1use calloop::{
  2    EventLoop, PostAction,
  3    channel::{self, Sender},
  4    timer::TimeoutAction,
  5};
  6use util::ResultExt;
  7
  8use std::{
  9    mem::MaybeUninit,
 10    thread,
 11    time::{Duration, Instant},
 12};
 13
 14use crate::{
 15    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
 16    PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming,
 17    ThreadTaskTimings, profiler,
 18};
 19
 20struct TimerAfter {
 21    duration: Duration,
 22    runnable: RunnableVariant,
 23}
 24
 25pub(crate) struct LinuxDispatcher {
 26    main_sender: PriorityQueueCalloopSender<RunnableVariant>,
 27    timer_sender: Sender<TimerAfter>,
 28    background_sender: PriorityQueueSender<RunnableVariant>,
 29    _background_threads: Vec<thread::JoinHandle<()>>,
 30    main_thread_id: thread::ThreadId,
 31}
 32
 33const MIN_THREADS: usize = 2;
 34
 35impl LinuxDispatcher {
 36    pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
 37        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 38        let thread_count =
 39            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 40
 41        // These thread should really be lower prio then the foreground
 42        // executor
 43        let mut background_threads = (0..thread_count)
 44            .map(|i| {
 45                let mut receiver = background_receiver.clone();
 46                std::thread::Builder::new()
 47                    .name(format!("Worker-{i}"))
 48                    .spawn(move || {
 49                        for runnable in receiver.iter() {
 50                            let start = Instant::now();
 51
 52                            let mut location = match runnable {
 53                                RunnableVariant::Meta(runnable) => {
 54                                    let location = runnable.metadata().location;
 55                                    let timing = TaskTiming {
 56                                        location,
 57                                        start,
 58                                        end: None,
 59                                    };
 60                                    profiler::add_task_timing(timing);
 61
 62                                    runnable.run();
 63                                    timing
 64                                }
 65                                RunnableVariant::Compat(runnable) => {
 66                                    let location = core::panic::Location::caller();
 67                                    let timing = TaskTiming {
 68                                        location,
 69                                        start,
 70                                        end: None,
 71                                    };
 72                                    profiler::add_task_timing(timing);
 73
 74                                    runnable.run();
 75                                    timing
 76                                }
 77                            };
 78
 79                            let end = Instant::now();
 80                            location.end = Some(end);
 81                            profiler::add_task_timing(location);
 82
 83                            log::trace!(
 84                                "background thread {}: ran runnable. took: {:?}",
 85                                i,
 86                                start.elapsed()
 87                            );
 88                        }
 89                    })
 90                    .unwrap()
 91            })
 92            .collect::<Vec<_>>();
 93
 94        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 95        let timer_thread = std::thread::Builder::new()
 96            .name("Timer".to_owned())
 97            .spawn(|| {
 98                let mut event_loop: EventLoop<()> =
 99                    EventLoop::try_new().expect("Failed to initialize timer loop!");
100
101                let handle = event_loop.handle();
102                let timer_handle = event_loop.handle();
103                handle
104                    .insert_source(timer_channel, move |e, _, _| {
105                        if let channel::Event::Msg(timer) = e {
106                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
107                            let mut runnable = Some(timer.runnable);
108                            timer_handle
109                                .insert_source(
110                                    calloop::timer::Timer::from_duration(timer.duration),
111                                    move |_, _, _| {
112                                        if let Some(runnable) = runnable.take() {
113                                            let start = Instant::now();
114                                            let mut timing = match runnable {
115                                                RunnableVariant::Meta(runnable) => {
116                                                    let location = runnable.metadata().location;
117                                                    let timing = TaskTiming {
118                                                        location,
119                                                        start,
120                                                        end: None,
121                                                    };
122                                                    profiler::add_task_timing(timing);
123
124                                                    runnable.run();
125                                                    timing
126                                                }
127                                                RunnableVariant::Compat(runnable) => {
128                                                    let timing = TaskTiming {
129                                                        location: core::panic::Location::caller(),
130                                                        start,
131                                                        end: None,
132                                                    };
133                                                    profiler::add_task_timing(timing);
134
135                                                    runnable.run();
136                                                    timing
137                                                }
138                                            };
139                                            let end = Instant::now();
140
141                                            timing.end = Some(end);
142                                            profiler::add_task_timing(timing);
143                                        }
144                                        TimeoutAction::Drop
145                                    },
146                                )
147                                .expect("Failed to start timer");
148                        }
149                    })
150                    .expect("Failed to start timer thread");
151
152                event_loop.run(None, &mut (), |_| {}).log_err();
153            })
154            .unwrap();
155
156        background_threads.push(timer_thread);
157
158        Self {
159            main_sender,
160            timer_sender,
161            background_sender,
162            _background_threads: background_threads,
163            main_thread_id: thread::current().id(),
164        }
165    }
166}
167
168impl PlatformDispatcher for LinuxDispatcher {
169    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
170        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
171        ThreadTaskTimings::convert(&global_timings)
172    }
173
174    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
175        THREAD_TIMINGS.with(|timings| {
176            let timings = timings.lock();
177            let timings = &timings.timings;
178
179            let mut vec = Vec::with_capacity(timings.len());
180
181            let (s1, s2) = timings.as_slices();
182            vec.extend_from_slice(s1);
183            vec.extend_from_slice(s2);
184            vec
185        })
186    }
187
188    fn is_main_thread(&self) -> bool {
189        thread::current().id() == self.main_thread_id
190    }
191
192    fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
193        self.background_sender
194            .send(priority, runnable)
195            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
196    }
197
198    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
199        self.main_sender
200            .send(priority, runnable)
201            .unwrap_or_else(|runnable| {
202                // NOTE: Runnable may wrap a Future that is !Send.
203                //
204                // This is usually safe because we only poll it on the main thread.
205                // However if the send fails, we know that:
206                // 1. main_receiver has been dropped (which implies the app is shutting down)
207                // 2. we are on a background thread.
208                // It is not safe to drop something !Send on the wrong thread, and
209                // the app will exit soon anyway, so we must forget the runnable.
210                std::mem::forget(runnable);
211            });
212    }
213
214    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
215        self.timer_sender
216            .send(TimerAfter { duration, runnable })
217            .ok();
218    }
219
220    fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
221        std::thread::spawn(move || {
222            // SAFETY: always safe to call
223            let thread_id = unsafe { libc::pthread_self() };
224
225            let policy = match priority {
226                RealtimePriority::Audio => libc::SCHED_FIFO,
227                RealtimePriority::Other => libc::SCHED_RR,
228            };
229            let sched_priority = match priority {
230                RealtimePriority::Audio => 65,
231                RealtimePriority::Other => 45,
232            };
233
234            // SAFETY: all sched_param members are valid when initialized to zero.
235            let mut sched_param =
236                unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
237            sched_param.sched_priority = sched_priority;
238            // SAFETY: sched_param is a valid initialized structure
239            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
240            if result != 0 {
241                log::warn!("failed to set realtime thread priority to {:?}", priority);
242            }
243
244            f();
245        });
246    }
247}
248
249pub struct PriorityQueueCalloopSender<T> {
250    sender: PriorityQueueSender<T>,
251    ping: calloop::ping::Ping,
252}
253
254impl<T> PriorityQueueCalloopSender<T> {
255    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
256        Self { sender: tx, ping }
257    }
258
259    fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
260        let res = self.sender.send(priority, item);
261        if res.is_ok() {
262            self.ping.ping();
263        }
264        res
265    }
266}
267
268impl<T> Drop for PriorityQueueCalloopSender<T> {
269    fn drop(&mut self) {
270        self.ping.ping();
271    }
272}
273
274pub struct PriorityQueueCalloopReceiver<T> {
275    receiver: PriorityQueueReceiver<T>,
276    source: calloop::ping::PingSource,
277    ping: calloop::ping::Ping,
278}
279
280impl<T> PriorityQueueCalloopReceiver<T> {
281    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
282        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
283
284        let (tx, rx) = PriorityQueueReceiver::new();
285
286        (
287            PriorityQueueCalloopSender::new(tx, ping.clone()),
288            Self {
289                receiver: rx,
290                source,
291                ping,
292            },
293        )
294    }
295}
296
297use calloop::channel::Event;
298
299#[derive(Debug)]
300pub struct ChannelError(calloop::ping::PingError);
301
302impl std::fmt::Display for ChannelError {
303    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        std::fmt::Display::fmt(&self.0, f)
306    }
307}
308
309impl std::error::Error for ChannelError {
310    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
311    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
312        Some(&self.0)
313    }
314}
315
316impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
317    type Event = Event<T>;
318    type Metadata = ();
319    type Ret = ();
320    type Error = ChannelError;
321
322    fn process_events<F>(
323        &mut self,
324        readiness: calloop::Readiness,
325        token: calloop::Token,
326        mut callback: F,
327    ) -> Result<calloop::PostAction, Self::Error>
328    where
329        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
330    {
331        let mut clear_readiness = false;
332        let mut disconnected = false;
333
334        let action = self
335            .source
336            .process_events(readiness, token, |(), &mut ()| {
337                let mut is_empty = true;
338
339                let mut receiver = self.receiver.clone();
340                for runnable in receiver.try_iter() {
341                    match runnable {
342                        Ok(r) => {
343                            callback(Event::Msg(r), &mut ());
344                            is_empty = false;
345                        }
346                        Err(_) => {
347                            disconnected = true;
348                        }
349                    }
350                }
351
352                if disconnected {
353                    callback(Event::Closed, &mut ());
354                }
355
356                if is_empty {
357                    clear_readiness = true;
358                }
359            })
360            .map_err(ChannelError)?;
361
362        if disconnected {
363            Ok(PostAction::Remove)
364        } else if clear_readiness {
365            Ok(action)
366        } else {
367            // Re-notify the ping source so we can try again.
368            self.ping.ping();
369            Ok(PostAction::Continue)
370        }
371    }
372
373    fn register(
374        &mut self,
375        poll: &mut calloop::Poll,
376        token_factory: &mut calloop::TokenFactory,
377    ) -> calloop::Result<()> {
378        self.source.register(poll, token_factory)
379    }
380
381    fn reregister(
382        &mut self,
383        poll: &mut calloop::Poll,
384        token_factory: &mut calloop::TokenFactory,
385    ) -> calloop::Result<()> {
386        self.source.reregister(poll, token_factory)
387    }
388
389    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
390        self.source.unregister(poll)
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn calloop_works() {
400        let mut event_loop = calloop::EventLoop::try_new().unwrap();
401        let handle = event_loop.handle();
402
403        let (tx, rx) = PriorityQueueCalloopReceiver::new();
404
405        struct Data {
406            got_msg: bool,
407            got_closed: bool,
408        }
409
410        let mut data = Data {
411            got_msg: false,
412            got_closed: false,
413        };
414
415        let _channel_token = handle
416            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
417                Event::Msg(()) => {
418                    data.got_msg = true;
419                }
420
421                Event::Closed => {
422                    data.got_closed = true;
423                }
424            })
425            .unwrap();
426
427        // nothing is sent, nothing is received
428        event_loop
429            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
430            .unwrap();
431
432        assert!(!data.got_msg);
433        assert!(!data.got_closed);
434        // a message is send
435
436        tx.send(Priority::Medium, ()).unwrap();
437        event_loop
438            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
439            .unwrap();
440
441        assert!(data.got_msg);
442        assert!(!data.got_closed);
443
444        // the sender is dropped
445        drop(tx);
446        event_loop
447            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
448            .unwrap();
449
450        assert!(data.got_msg);
451        assert!(data.got_closed);
452    }
453}
454
455// running 1 test
456// test platform::linux::dispatcher::tests::tomato ... FAILED
457
458// failures:
459
460// ---- platform::linux::dispatcher::tests::tomato stdout ----
461// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
462// returning 1 tasks to process
463// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
464//     (),
465// )
466// returning 0 tasks to process
467
468// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
469// assertion failed: data.got_closed
470// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace