dispatcher.rs

  1use calloop::{
  2    EventLoop, PostAction,
  3    channel::{self, Sender},
  4    timer::TimeoutAction,
  5};
  6use util::ResultExt;
  7
  8use std::{
  9    mem::MaybeUninit,
 10    thread,
 11    time::{Duration, Instant},
 12};
 13
 14use gpui::{
 15    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
 16    PriorityQueueSender, RunnableVariant, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings, profiler,
 17};
 18
 19struct TimerAfter {
 20    duration: Duration,
 21    runnable: RunnableVariant,
 22}
 23
 24pub(crate) struct LinuxDispatcher {
 25    main_sender: PriorityQueueCalloopSender<RunnableVariant>,
 26    timer_sender: Sender<TimerAfter>,
 27    background_sender: PriorityQueueSender<RunnableVariant>,
 28    _background_threads: Vec<thread::JoinHandle<()>>,
 29    main_thread_id: thread::ThreadId,
 30}
 31
 32const MIN_THREADS: usize = 2;
 33
 34impl LinuxDispatcher {
 35    pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
 36        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 37        let thread_count =
 38            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 39
 40        let mut background_threads = (0..thread_count)
 41            .map(|i| {
 42                let receiver: PriorityQueueReceiver<RunnableVariant> = background_receiver.clone();
 43                std::thread::Builder::new()
 44                    .name(format!("Worker-{i}"))
 45                    .spawn(move || {
 46                        for runnable in receiver.iter() {
 47                            // Check if the executor that spawned this task was closed
 48                            if runnable.metadata().is_closed() {
 49                                continue;
 50                            }
 51
 52                            let start = Instant::now();
 53
 54                            let location = runnable.metadata().location;
 55                            let mut timing = TaskTiming {
 56                                location,
 57                                start,
 58                                end: None,
 59                            };
 60                            profiler::add_task_timing(timing);
 61
 62                            runnable.run();
 63
 64                            let end = Instant::now();
 65                            timing.end = Some(end);
 66                            profiler::add_task_timing(timing);
 67
 68                            log::trace!(
 69                                "background thread {}: ran runnable. took: {:?}",
 70                                i,
 71                                start.elapsed()
 72                            );
 73                        }
 74                    })
 75                    .unwrap()
 76            })
 77            .collect::<Vec<_>>();
 78
 79        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 80        let timer_thread = std::thread::Builder::new()
 81            .name("Timer".to_owned())
 82            .spawn(move || {
 83                let mut event_loop: EventLoop<()> =
 84                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 85
 86                let handle = event_loop.handle();
 87                let timer_handle = event_loop.handle();
 88                handle
 89                    .insert_source(timer_channel, move |e, _, _| {
 90                        if let channel::Event::Msg(timer) = e {
 91                            let mut runnable = Some(timer.runnable);
 92                            timer_handle
 93                                .insert_source(
 94                                    calloop::timer::Timer::from_duration(timer.duration),
 95                                    move |_, _, _| {
 96                                        if let Some(runnable) = runnable.take() {
 97                                            // Check if the executor that spawned this task was closed
 98                                            if runnable.metadata().is_closed() {
 99                                                return TimeoutAction::Drop;
100                                            }
101
102                                            let start = Instant::now();
103                                            let location = runnable.metadata().location;
104                                            let mut timing = TaskTiming {
105                                                location,
106                                                start,
107                                                end: None,
108                                            };
109                                            profiler::add_task_timing(timing);
110
111                                            runnable.run();
112                                            let end = Instant::now();
113
114                                            timing.end = Some(end);
115                                            profiler::add_task_timing(timing);
116                                        }
117                                        TimeoutAction::Drop
118                                    },
119                                )
120                                .expect("Failed to start timer");
121                        }
122                    })
123                    .expect("Failed to start timer thread");
124
125                event_loop.run(None, &mut (), |_| {}).log_err();
126            })
127            .unwrap();
128
129        background_threads.push(timer_thread);
130
131        Self {
132            main_sender,
133            timer_sender,
134            background_sender,
135            _background_threads: background_threads,
136            main_thread_id: thread::current().id(),
137        }
138    }
139}
140
141impl PlatformDispatcher for LinuxDispatcher {
142    fn get_all_timings(&self) -> Vec<gpui::ThreadTaskTimings> {
143        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
144        ThreadTaskTimings::convert(&global_timings)
145    }
146
147    fn get_current_thread_timings(&self) -> gpui::ThreadTaskTimings {
148        THREAD_TIMINGS.with(|timings| {
149            let timings = timings.lock();
150            let thread_name = timings.thread_name.clone();
151            let total_pushed = timings.total_pushed;
152            let timings = &timings.timings;
153
154            let mut vec = Vec::with_capacity(timings.len());
155
156            let (s1, s2) = timings.as_slices();
157            vec.extend_from_slice(s1);
158            vec.extend_from_slice(s2);
159
160            gpui::ThreadTaskTimings {
161                thread_name,
162                thread_id: std::thread::current().id(),
163                timings: vec,
164                total_pushed,
165            }
166        })
167    }
168
169    fn is_main_thread(&self) -> bool {
170        thread::current().id() == self.main_thread_id
171    }
172
173    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
174        self.background_sender
175            .send(priority, runnable)
176            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
177    }
178
179    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
180        self.main_sender
181            .send(priority, runnable)
182            .unwrap_or_else(|runnable| {
183                // NOTE: Runnable may wrap a Future that is !Send.
184                //
185                // This is usually safe because we only poll it on the main thread.
186                // However if the send fails, we know that:
187                // 1. main_receiver has been dropped (which implies the app is shutting down)
188                // 2. we are on a background thread.
189                // It is not safe to drop something !Send on the wrong thread, and
190                // the app will exit soon anyway, so we must forget the runnable.
191                std::mem::forget(runnable);
192            });
193    }
194
195    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
196        self.timer_sender
197            .send(TimerAfter { duration, runnable })
198            .ok();
199    }
200
201    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
202        std::thread::spawn(move || {
203            // SAFETY: always safe to call
204            let thread_id = unsafe { libc::pthread_self() };
205
206            let policy = libc::SCHED_FIFO;
207            let sched_priority = 65;
208
209            // SAFETY: all sched_param members are valid when initialized to zero.
210            let mut sched_param =
211                unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
212            sched_param.sched_priority = sched_priority;
213            // SAFETY: sched_param is a valid initialized structure
214            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
215            if result != 0 {
216                log::warn!("failed to set realtime thread priority");
217            }
218
219            f();
220        });
221    }
222}
223
224pub struct PriorityQueueCalloopSender<T> {
225    sender: PriorityQueueSender<T>,
226    ping: calloop::ping::Ping,
227}
228
229impl<T> PriorityQueueCalloopSender<T> {
230    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
231        Self { sender: tx, ping }
232    }
233
234    fn send(&self, priority: Priority, item: T) -> Result<(), gpui::queue::SendError<T>> {
235        let res = self.sender.send(priority, item);
236        if res.is_ok() {
237            self.ping.ping();
238        }
239        res
240    }
241}
242
243impl<T> Drop for PriorityQueueCalloopSender<T> {
244    fn drop(&mut self) {
245        self.ping.ping();
246    }
247}
248
249pub struct PriorityQueueCalloopReceiver<T> {
250    receiver: PriorityQueueReceiver<T>,
251    source: calloop::ping::PingSource,
252    ping: calloop::ping::Ping,
253}
254
255impl<T> PriorityQueueCalloopReceiver<T> {
256    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
257        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
258
259        let (tx, rx) = PriorityQueueReceiver::new();
260
261        (
262            PriorityQueueCalloopSender::new(tx, ping.clone()),
263            Self {
264                receiver: rx,
265                source,
266                ping,
267            },
268        )
269    }
270}
271
272use calloop::channel::Event;
273
274#[derive(Debug)]
275pub struct ChannelError(calloop::ping::PingError);
276
277impl std::fmt::Display for ChannelError {
278    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        std::fmt::Display::fmt(&self.0, f)
281    }
282}
283
284impl std::error::Error for ChannelError {
285    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
286    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
287        Some(&self.0)
288    }
289}
290
291impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
292    type Event = Event<T>;
293    type Metadata = ();
294    type Ret = ();
295    type Error = ChannelError;
296
297    fn process_events<F>(
298        &mut self,
299        readiness: calloop::Readiness,
300        token: calloop::Token,
301        mut callback: F,
302    ) -> Result<calloop::PostAction, Self::Error>
303    where
304        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
305    {
306        let mut clear_readiness = false;
307        let mut disconnected = false;
308
309        let action = self
310            .source
311            .process_events(readiness, token, |(), &mut ()| {
312                let mut is_empty = true;
313
314                let receiver = self.receiver.clone();
315                for runnable in receiver.try_iter() {
316                    match runnable {
317                        Ok(r) => {
318                            callback(Event::Msg(r), &mut ());
319                            is_empty = false;
320                        }
321                        Err(_) => {
322                            disconnected = true;
323                        }
324                    }
325                }
326
327                if disconnected {
328                    callback(Event::Closed, &mut ());
329                }
330
331                if is_empty {
332                    clear_readiness = true;
333                }
334            })
335            .map_err(ChannelError)?;
336
337        if disconnected {
338            Ok(PostAction::Remove)
339        } else if clear_readiness {
340            Ok(action)
341        } else {
342            // Re-notify the ping source so we can try again.
343            self.ping.ping();
344            Ok(PostAction::Continue)
345        }
346    }
347
348    fn register(
349        &mut self,
350        poll: &mut calloop::Poll,
351        token_factory: &mut calloop::TokenFactory,
352    ) -> calloop::Result<()> {
353        self.source.register(poll, token_factory)
354    }
355
356    fn reregister(
357        &mut self,
358        poll: &mut calloop::Poll,
359        token_factory: &mut calloop::TokenFactory,
360    ) -> calloop::Result<()> {
361        self.source.reregister(poll, token_factory)
362    }
363
364    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
365        self.source.unregister(poll)
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn calloop_works() {
375        let mut event_loop = calloop::EventLoop::try_new().unwrap();
376        let handle = event_loop.handle();
377
378        let (tx, rx) = PriorityQueueCalloopReceiver::new();
379
380        struct Data {
381            got_msg: bool,
382            got_closed: bool,
383        }
384
385        let mut data = Data {
386            got_msg: false,
387            got_closed: false,
388        };
389
390        let _channel_token = handle
391            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
392                Event::Msg(()) => {
393                    data.got_msg = true;
394                }
395
396                Event::Closed => {
397                    data.got_closed = true;
398                }
399            })
400            .unwrap();
401
402        // nothing is sent, nothing is received
403        event_loop
404            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
405            .unwrap();
406
407        assert!(!data.got_msg);
408        assert!(!data.got_closed);
409        // a message is send
410
411        tx.send(Priority::Medium, ()).unwrap();
412        event_loop
413            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
414            .unwrap();
415
416        assert!(data.got_msg);
417        assert!(!data.got_closed);
418
419        // the sender is dropped
420        drop(tx);
421        event_loop
422            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
423            .unwrap();
424
425        assert!(data.got_msg);
426        assert!(data.got_closed);
427    }
428}
429
430// running 1 test
431// test linux::dispatcher::tests::tomato ... FAILED
432
433// failures:
434
435// ---- linux::dispatcher::tests::tomato stdout ----
436// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
437// returning 1 tasks to process
438// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
439//     (),
440// )
441// returning 0 tasks to process
442
443// thread 'linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
444// assertion failed: data.got_closed
445// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace