dispatcher.rs

  1use calloop::{
  2    EventLoop, PostAction,
  3    channel::{self, Sender},
  4    timer::TimeoutAction,
  5};
  6use util::ResultExt;
  7
  8use std::{
  9    mem::MaybeUninit,
 10    thread,
 11    time::{Duration, Instant},
 12};
 13
 14use gpui::{
 15    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
 16    PriorityQueueSender, RunnableVariant, TaskTiming, ThreadTaskTimings, profiler,
 17};
 18
 19struct TimerAfter {
 20    duration: Duration,
 21    runnable: RunnableVariant,
 22}
 23
 24pub(crate) struct LinuxDispatcher {
 25    main_sender: PriorityQueueCalloopSender<RunnableVariant>,
 26    timer_sender: Sender<TimerAfter>,
 27    background_sender: PriorityQueueSender<RunnableVariant>,
 28    _background_threads: Vec<thread::JoinHandle<()>>,
 29    main_thread_id: thread::ThreadId,
 30}
 31
 32const MIN_THREADS: usize = 2;
 33
 34impl LinuxDispatcher {
 35    pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
 36        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 37        let thread_count =
 38            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 39
 40        let mut background_threads = (0..thread_count)
 41            .map(|i| {
 42                let receiver: PriorityQueueReceiver<RunnableVariant> = background_receiver.clone();
 43                std::thread::Builder::new()
 44                    .name(format!("Worker-{i}"))
 45                    .spawn(move || {
 46                        for runnable in receiver.iter() {
 47                            let start = Instant::now();
 48
 49                            let location = runnable.metadata().location;
 50                            let mut timing = TaskTiming {
 51                                location,
 52                                start,
 53                                end: None,
 54                            };
 55                            profiler::add_task_timing(timing);
 56
 57                            runnable.run();
 58
 59                            let end = Instant::now();
 60                            timing.end = Some(end);
 61                            profiler::add_task_timing(timing);
 62
 63                            log::trace!(
 64                                "background thread {}: ran runnable. took: {:?}",
 65                                i,
 66                                start.elapsed()
 67                            );
 68                        }
 69                    })
 70                    .unwrap()
 71            })
 72            .collect::<Vec<_>>();
 73
 74        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 75        let timer_thread = std::thread::Builder::new()
 76            .name("Timer".to_owned())
 77            .spawn(move || {
 78                let mut event_loop: EventLoop<()> =
 79                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 80
 81                let handle = event_loop.handle();
 82                let timer_handle = event_loop.handle();
 83                handle
 84                    .insert_source(timer_channel, move |e, _, _| {
 85                        if let channel::Event::Msg(timer) = e {
 86                            let mut runnable = Some(timer.runnable);
 87                            timer_handle
 88                                .insert_source(
 89                                    calloop::timer::Timer::from_duration(timer.duration),
 90                                    move |_, _, _| {
 91                                        if let Some(runnable) = runnable.take() {
 92                                            let start = Instant::now();
 93                                            let location = runnable.metadata().location;
 94                                            let mut timing = TaskTiming {
 95                                                location,
 96                                                start,
 97                                                end: None,
 98                                            };
 99                                            profiler::add_task_timing(timing);
100
101                                            runnable.run();
102                                            let end = Instant::now();
103
104                                            timing.end = Some(end);
105                                            profiler::add_task_timing(timing);
106                                        }
107                                        TimeoutAction::Drop
108                                    },
109                                )
110                                .expect("Failed to start timer");
111                        }
112                    })
113                    .expect("Failed to start timer thread");
114
115                event_loop.run(None, &mut (), |_| {}).log_err();
116            })
117            .unwrap();
118
119        background_threads.push(timer_thread);
120
121        Self {
122            main_sender,
123            timer_sender,
124            background_sender,
125            _background_threads: background_threads,
126            main_thread_id: thread::current().id(),
127        }
128    }
129}
130
131impl PlatformDispatcher for LinuxDispatcher {
132    fn get_all_timings(&self) -> Vec<gpui::ThreadTaskTimings> {
133        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
134        ThreadTaskTimings::convert(&global_timings)
135    }
136
137    fn get_current_thread_timings(&self) -> gpui::ThreadTaskTimings {
138        gpui::profiler::get_current_thread_task_timings()
139    }
140
141    fn is_main_thread(&self) -> bool {
142        thread::current().id() == self.main_thread_id
143    }
144
145    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
146        self.background_sender
147            .send(priority, runnable)
148            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
149    }
150
151    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
152        self.main_sender
153            .send(priority, runnable)
154            .unwrap_or_else(|runnable| {
155                // NOTE: Runnable may wrap a Future that is !Send.
156                //
157                // This is usually safe because we only poll it on the main thread.
158                // However if the send fails, we know that:
159                // 1. main_receiver has been dropped (which implies the app is shutting down)
160                // 2. we are on a background thread.
161                // It is not safe to drop something !Send on the wrong thread, and
162                // the app will exit soon anyway, so we must forget the runnable.
163                std::mem::forget(runnable);
164            });
165    }
166
167    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
168        self.timer_sender
169            .send(TimerAfter { duration, runnable })
170            .ok();
171    }
172
173    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
174        std::thread::spawn(move || {
175            // SAFETY: always safe to call
176            let thread_id = unsafe { libc::pthread_self() };
177
178            let policy = libc::SCHED_FIFO;
179            let sched_priority = 65;
180
181            // SAFETY: all sched_param members are valid when initialized to zero.
182            let mut sched_param =
183                unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
184            sched_param.sched_priority = sched_priority;
185            // SAFETY: sched_param is a valid initialized structure
186            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
187            if result != 0 {
188                log::warn!("failed to set realtime thread priority");
189            }
190
191            f();
192        });
193    }
194}
195
196pub struct PriorityQueueCalloopSender<T> {
197    sender: PriorityQueueSender<T>,
198    ping: calloop::ping::Ping,
199}
200
201impl<T> PriorityQueueCalloopSender<T> {
202    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
203        Self { sender: tx, ping }
204    }
205
206    fn send(&self, priority: Priority, item: T) -> Result<(), gpui::queue::SendError<T>> {
207        let res = self.sender.send(priority, item);
208        if res.is_ok() {
209            self.ping.ping();
210        }
211        res
212    }
213}
214
215impl<T> Drop for PriorityQueueCalloopSender<T> {
216    fn drop(&mut self) {
217        self.ping.ping();
218    }
219}
220
221pub struct PriorityQueueCalloopReceiver<T> {
222    receiver: PriorityQueueReceiver<T>,
223    source: calloop::ping::PingSource,
224    ping: calloop::ping::Ping,
225}
226
227impl<T> PriorityQueueCalloopReceiver<T> {
228    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
229        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
230
231        let (tx, rx) = PriorityQueueReceiver::new();
232
233        (
234            PriorityQueueCalloopSender::new(tx, ping.clone()),
235            Self {
236                receiver: rx,
237                source,
238                ping,
239            },
240        )
241    }
242}
243
244use calloop::channel::Event;
245
246#[derive(Debug)]
247pub struct ChannelError(calloop::ping::PingError);
248
249impl std::fmt::Display for ChannelError {
250    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        std::fmt::Display::fmt(&self.0, f)
253    }
254}
255
256impl std::error::Error for ChannelError {
257    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
258    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
259        Some(&self.0)
260    }
261}
262
263impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
264    type Event = Event<T>;
265    type Metadata = ();
266    type Ret = ();
267    type Error = ChannelError;
268
269    fn process_events<F>(
270        &mut self,
271        readiness: calloop::Readiness,
272        token: calloop::Token,
273        mut callback: F,
274    ) -> Result<calloop::PostAction, Self::Error>
275    where
276        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
277    {
278        let mut clear_readiness = false;
279        let mut disconnected = false;
280
281        let action = self
282            .source
283            .process_events(readiness, token, |(), &mut ()| {
284                let mut is_empty = true;
285
286                let receiver = self.receiver.clone();
287                for runnable in receiver.try_iter() {
288                    match runnable {
289                        Ok(r) => {
290                            callback(Event::Msg(r), &mut ());
291                            is_empty = false;
292                        }
293                        Err(_) => {
294                            disconnected = true;
295                        }
296                    }
297                }
298
299                if disconnected {
300                    callback(Event::Closed, &mut ());
301                }
302
303                if is_empty {
304                    clear_readiness = true;
305                }
306            })
307            .map_err(ChannelError)?;
308
309        if disconnected {
310            Ok(PostAction::Remove)
311        } else if clear_readiness {
312            Ok(action)
313        } else {
314            // Re-notify the ping source so we can try again.
315            self.ping.ping();
316            Ok(PostAction::Continue)
317        }
318    }
319
320    fn register(
321        &mut self,
322        poll: &mut calloop::Poll,
323        token_factory: &mut calloop::TokenFactory,
324    ) -> calloop::Result<()> {
325        self.source.register(poll, token_factory)
326    }
327
328    fn reregister(
329        &mut self,
330        poll: &mut calloop::Poll,
331        token_factory: &mut calloop::TokenFactory,
332    ) -> calloop::Result<()> {
333        self.source.reregister(poll, token_factory)
334    }
335
336    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
337        self.source.unregister(poll)
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn calloop_works() {
347        let mut event_loop = calloop::EventLoop::try_new().unwrap();
348        let handle = event_loop.handle();
349
350        let (tx, rx) = PriorityQueueCalloopReceiver::new();
351
352        struct Data {
353            got_msg: bool,
354            got_closed: bool,
355        }
356
357        let mut data = Data {
358            got_msg: false,
359            got_closed: false,
360        };
361
362        let _channel_token = handle
363            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
364                Event::Msg(()) => {
365                    data.got_msg = true;
366                }
367
368                Event::Closed => {
369                    data.got_closed = true;
370                }
371            })
372            .unwrap();
373
374        // nothing is sent, nothing is received
375        event_loop
376            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
377            .unwrap();
378
379        assert!(!data.got_msg);
380        assert!(!data.got_closed);
381        // a message is send
382
383        tx.send(Priority::Medium, ()).unwrap();
384        event_loop
385            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
386            .unwrap();
387
388        assert!(data.got_msg);
389        assert!(!data.got_closed);
390
391        // the sender is dropped
392        drop(tx);
393        event_loop
394            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
395            .unwrap();
396
397        assert!(data.got_msg);
398        assert!(data.got_closed);
399    }
400}
401
402// running 1 test
403// test linux::dispatcher::tests::tomato ... FAILED
404
405// failures:
406
407// ---- linux::dispatcher::tests::tomato stdout ----
408// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
409// returning 1 tasks to process
410// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
411//     (),
412// )
413// returning 0 tasks to process
414
415// thread 'linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
416// assertion failed: data.got_closed
417// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace