dispatcher.rs

  1use calloop::{
  2    EventLoop, PostAction,
  3    channel::{self, Sender},
  4    timer::TimeoutAction,
  5};
  6use util::ResultExt;
  7
  8use std::{
  9    mem::MaybeUninit,
 10    thread,
 11    time::{Duration, Instant},
 12};
 13
 14use gpui::{
 15    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
 16    PriorityQueueSender, RunnableVariant, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings, profiler,
 17};
 18
 19struct TimerAfter {
 20    duration: Duration,
 21    runnable: RunnableVariant,
 22}
 23
 24pub(crate) struct LinuxDispatcher {
 25    main_sender: PriorityQueueCalloopSender<RunnableVariant>,
 26    timer_sender: Sender<TimerAfter>,
 27    background_sender: PriorityQueueSender<RunnableVariant>,
 28    _background_threads: Vec<thread::JoinHandle<()>>,
 29    main_thread_id: thread::ThreadId,
 30}
 31
 32const MIN_THREADS: usize = 2;
 33
 34impl LinuxDispatcher {
 35    pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
 36        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 37        let thread_count =
 38            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 39
 40        let mut background_threads = (0..thread_count)
 41            .map(|i| {
 42                let receiver: PriorityQueueReceiver<RunnableVariant> = background_receiver.clone();
 43                std::thread::Builder::new()
 44                    .name(format!("Worker-{i}"))
 45                    .spawn(move || {
 46                        for runnable in receiver.iter() {
 47                            let start = Instant::now();
 48
 49                            let location = runnable.metadata().location;
 50                            let mut timing = TaskTiming {
 51                                location,
 52                                start,
 53                                end: None,
 54                            };
 55                            profiler::add_task_timing(timing);
 56
 57                            runnable.run();
 58
 59                            let end = Instant::now();
 60                            timing.end = Some(end);
 61                            profiler::add_task_timing(timing);
 62
 63                            log::trace!(
 64                                "background thread {}: ran runnable. took: {:?}",
 65                                i,
 66                                start.elapsed()
 67                            );
 68                        }
 69                    })
 70                    .unwrap()
 71            })
 72            .collect::<Vec<_>>();
 73
 74        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 75        let timer_thread = std::thread::Builder::new()
 76            .name("Timer".to_owned())
 77            .spawn(move || {
 78                let mut event_loop: EventLoop<()> =
 79                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 80
 81                let handle = event_loop.handle();
 82                let timer_handle = event_loop.handle();
 83                handle
 84                    .insert_source(timer_channel, move |e, _, _| {
 85                        if let channel::Event::Msg(timer) = e {
 86                            let mut runnable = Some(timer.runnable);
 87                            timer_handle
 88                                .insert_source(
 89                                    calloop::timer::Timer::from_duration(timer.duration),
 90                                    move |_, _, _| {
 91                                        if let Some(runnable) = runnable.take() {
 92                                            let start = Instant::now();
 93                                            let location = runnable.metadata().location;
 94                                            let mut timing = TaskTiming {
 95                                                location,
 96                                                start,
 97                                                end: None,
 98                                            };
 99                                            profiler::add_task_timing(timing);
100
101                                            runnable.run();
102                                            let end = Instant::now();
103
104                                            timing.end = Some(end);
105                                            profiler::add_task_timing(timing);
106                                        }
107                                        TimeoutAction::Drop
108                                    },
109                                )
110                                .expect("Failed to start timer");
111                        }
112                    })
113                    .expect("Failed to start timer thread");
114
115                event_loop.run(None, &mut (), |_| {}).log_err();
116            })
117            .unwrap();
118
119        background_threads.push(timer_thread);
120
121        Self {
122            main_sender,
123            timer_sender,
124            background_sender,
125            _background_threads: background_threads,
126            main_thread_id: thread::current().id(),
127        }
128    }
129}
130
131impl PlatformDispatcher for LinuxDispatcher {
132    fn get_all_timings(&self) -> Vec<gpui::ThreadTaskTimings> {
133        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
134        ThreadTaskTimings::convert(&global_timings)
135    }
136
137    fn get_current_thread_timings(&self) -> gpui::ThreadTaskTimings {
138        THREAD_TIMINGS.with(|timings| {
139            let timings = timings.lock();
140            let thread_name = timings.thread_name.clone();
141            let total_pushed = timings.total_pushed;
142            let timings = &timings.timings;
143
144            let mut vec = Vec::with_capacity(timings.len());
145
146            let (s1, s2) = timings.as_slices();
147            vec.extend_from_slice(s1);
148            vec.extend_from_slice(s2);
149
150            gpui::ThreadTaskTimings {
151                thread_name,
152                thread_id: std::thread::current().id(),
153                timings: vec,
154                total_pushed,
155            }
156        })
157    }
158
159    fn is_main_thread(&self) -> bool {
160        thread::current().id() == self.main_thread_id
161    }
162
163    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
164        self.background_sender
165            .send(priority, runnable)
166            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
167    }
168
169    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
170        self.main_sender
171            .send(priority, runnable)
172            .unwrap_or_else(|runnable| {
173                // NOTE: Runnable may wrap a Future that is !Send.
174                //
175                // This is usually safe because we only poll it on the main thread.
176                // However if the send fails, we know that:
177                // 1. main_receiver has been dropped (which implies the app is shutting down)
178                // 2. we are on a background thread.
179                // It is not safe to drop something !Send on the wrong thread, and
180                // the app will exit soon anyway, so we must forget the runnable.
181                std::mem::forget(runnable);
182            });
183    }
184
185    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
186        self.timer_sender
187            .send(TimerAfter { duration, runnable })
188            .ok();
189    }
190
191    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
192        std::thread::spawn(move || {
193            // SAFETY: always safe to call
194            let thread_id = unsafe { libc::pthread_self() };
195
196            let policy = libc::SCHED_FIFO;
197            let sched_priority = 65;
198
199            // SAFETY: all sched_param members are valid when initialized to zero.
200            let mut sched_param =
201                unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
202            sched_param.sched_priority = sched_priority;
203            // SAFETY: sched_param is a valid initialized structure
204            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
205            if result != 0 {
206                log::warn!("failed to set realtime thread priority");
207            }
208
209            f();
210        });
211    }
212}
213
214pub struct PriorityQueueCalloopSender<T> {
215    sender: PriorityQueueSender<T>,
216    ping: calloop::ping::Ping,
217}
218
219impl<T> PriorityQueueCalloopSender<T> {
220    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
221        Self { sender: tx, ping }
222    }
223
224    fn send(&self, priority: Priority, item: T) -> Result<(), gpui::queue::SendError<T>> {
225        let res = self.sender.send(priority, item);
226        if res.is_ok() {
227            self.ping.ping();
228        }
229        res
230    }
231}
232
233impl<T> Drop for PriorityQueueCalloopSender<T> {
234    fn drop(&mut self) {
235        self.ping.ping();
236    }
237}
238
239pub struct PriorityQueueCalloopReceiver<T> {
240    receiver: PriorityQueueReceiver<T>,
241    source: calloop::ping::PingSource,
242    ping: calloop::ping::Ping,
243}
244
245impl<T> PriorityQueueCalloopReceiver<T> {
246    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
247        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
248
249        let (tx, rx) = PriorityQueueReceiver::new();
250
251        (
252            PriorityQueueCalloopSender::new(tx, ping.clone()),
253            Self {
254                receiver: rx,
255                source,
256                ping,
257            },
258        )
259    }
260}
261
262use calloop::channel::Event;
263
264#[derive(Debug)]
265pub struct ChannelError(calloop::ping::PingError);
266
267impl std::fmt::Display for ChannelError {
268    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        std::fmt::Display::fmt(&self.0, f)
271    }
272}
273
274impl std::error::Error for ChannelError {
275    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
276    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
277        Some(&self.0)
278    }
279}
280
281impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
282    type Event = Event<T>;
283    type Metadata = ();
284    type Ret = ();
285    type Error = ChannelError;
286
287    fn process_events<F>(
288        &mut self,
289        readiness: calloop::Readiness,
290        token: calloop::Token,
291        mut callback: F,
292    ) -> Result<calloop::PostAction, Self::Error>
293    where
294        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
295    {
296        let mut clear_readiness = false;
297        let mut disconnected = false;
298
299        let action = self
300            .source
301            .process_events(readiness, token, |(), &mut ()| {
302                let mut is_empty = true;
303
304                let receiver = self.receiver.clone();
305                for runnable in receiver.try_iter() {
306                    match runnable {
307                        Ok(r) => {
308                            callback(Event::Msg(r), &mut ());
309                            is_empty = false;
310                        }
311                        Err(_) => {
312                            disconnected = true;
313                        }
314                    }
315                }
316
317                if disconnected {
318                    callback(Event::Closed, &mut ());
319                }
320
321                if is_empty {
322                    clear_readiness = true;
323                }
324            })
325            .map_err(ChannelError)?;
326
327        if disconnected {
328            Ok(PostAction::Remove)
329        } else if clear_readiness {
330            Ok(action)
331        } else {
332            // Re-notify the ping source so we can try again.
333            self.ping.ping();
334            Ok(PostAction::Continue)
335        }
336    }
337
338    fn register(
339        &mut self,
340        poll: &mut calloop::Poll,
341        token_factory: &mut calloop::TokenFactory,
342    ) -> calloop::Result<()> {
343        self.source.register(poll, token_factory)
344    }
345
346    fn reregister(
347        &mut self,
348        poll: &mut calloop::Poll,
349        token_factory: &mut calloop::TokenFactory,
350    ) -> calloop::Result<()> {
351        self.source.reregister(poll, token_factory)
352    }
353
354    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
355        self.source.unregister(poll)
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn calloop_works() {
365        let mut event_loop = calloop::EventLoop::try_new().unwrap();
366        let handle = event_loop.handle();
367
368        let (tx, rx) = PriorityQueueCalloopReceiver::new();
369
370        struct Data {
371            got_msg: bool,
372            got_closed: bool,
373        }
374
375        let mut data = Data {
376            got_msg: false,
377            got_closed: false,
378        };
379
380        let _channel_token = handle
381            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
382                Event::Msg(()) => {
383                    data.got_msg = true;
384                }
385
386                Event::Closed => {
387                    data.got_closed = true;
388                }
389            })
390            .unwrap();
391
392        // nothing is sent, nothing is received
393        event_loop
394            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
395            .unwrap();
396
397        assert!(!data.got_msg);
398        assert!(!data.got_closed);
399        // a message is send
400
401        tx.send(Priority::Medium, ()).unwrap();
402        event_loop
403            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
404            .unwrap();
405
406        assert!(data.got_msg);
407        assert!(!data.got_closed);
408
409        // the sender is dropped
410        drop(tx);
411        event_loop
412            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
413            .unwrap();
414
415        assert!(data.got_msg);
416        assert!(data.got_closed);
417    }
418}
419
420// running 1 test
421// test linux::dispatcher::tests::tomato ... FAILED
422
423// failures:
424
425// ---- linux::dispatcher::tests::tomato stdout ----
426// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
427// returning 1 tasks to process
428// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
429//     (),
430// )
431// returning 0 tasks to process
432
433// thread 'linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
434// assertion failed: data.got_closed
435// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace