dispatcher.rs

  1use crate::{
  2    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel,
  3    TaskTiming, ThreadTaskTimings,
  4};
  5use calloop::{
  6    EventLoop,
  7    channel::{self, Sender},
  8    timer::TimeoutAction,
  9};
 10use std::{
 11    thread,
 12    time::{Duration, Instant},
 13};
 14use util::ResultExt;
 15
 16struct TimerAfter {
 17    duration: Duration,
 18    runnable: RunnableVariant,
 19}
 20
 21pub(crate) struct LinuxDispatcher {
 22    main_sender: Sender<RunnableVariant>,
 23    timer_sender: Sender<TimerAfter>,
 24    background_sender: flume::Sender<RunnableVariant>,
 25    _background_threads: Vec<thread::JoinHandle<()>>,
 26    main_thread_id: thread::ThreadId,
 27}
 28
 29impl LinuxDispatcher {
 30    pub fn new(main_sender: Sender<RunnableVariant>) -> Self {
 31        let (background_sender, background_receiver) = flume::unbounded::<RunnableVariant>();
 32        let thread_count = std::thread::available_parallelism()
 33            .map(|i| i.get())
 34            .unwrap_or(1);
 35
 36        let mut background_threads = (0..thread_count)
 37            .map(|i| {
 38                let receiver = background_receiver.clone();
 39                std::thread::Builder::new()
 40                    .name(format!("Worker-{i}"))
 41                    .spawn(move || {
 42                        for runnable in receiver {
 43                            let start = Instant::now();
 44
 45                            let mut location = match runnable {
 46                                RunnableVariant::Meta(runnable) => {
 47                                    let location = runnable.metadata().location;
 48                                    let timing = TaskTiming {
 49                                        location,
 50                                        start,
 51                                        end: None,
 52                                    };
 53                                    Self::add_task_timing(timing);
 54
 55                                    runnable.run();
 56                                    timing
 57                                }
 58                                RunnableVariant::Compat(runnable) => {
 59                                    let location = core::panic::Location::caller();
 60                                    let timing = TaskTiming {
 61                                        location,
 62                                        start,
 63                                        end: None,
 64                                    };
 65                                    Self::add_task_timing(timing);
 66
 67                                    runnable.run();
 68                                    timing
 69                                }
 70                            };
 71
 72                            let end = Instant::now();
 73                            location.end = Some(end);
 74                            Self::add_task_timing(location);
 75
 76                            log::trace!(
 77                                "background thread {}: ran runnable. took: {:?}",
 78                                i,
 79                                start.elapsed()
 80                            );
 81                        }
 82                    })
 83                    .unwrap()
 84            })
 85            .collect::<Vec<_>>();
 86
 87        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 88        let timer_thread = std::thread::Builder::new()
 89            .name("Timer".to_owned())
 90            .spawn(|| {
 91                let mut event_loop: EventLoop<()> =
 92                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 93
 94                let handle = event_loop.handle();
 95                let timer_handle = event_loop.handle();
 96                handle
 97                    .insert_source(timer_channel, move |e, _, _| {
 98                        if let channel::Event::Msg(timer) = e {
 99                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
100                            let mut runnable = Some(timer.runnable);
101                            timer_handle
102                                .insert_source(
103                                    calloop::timer::Timer::from_duration(timer.duration),
104                                    move |_, _, _| {
105                                        if let Some(runnable) = runnable.take() {
106                                            let start = Instant::now();
107                                            let mut timing = match runnable {
108                                                RunnableVariant::Meta(runnable) => {
109                                                    let location = runnable.metadata().location;
110                                                    let timing = TaskTiming {
111                                                        location,
112                                                        start,
113                                                        end: None,
114                                                    };
115                                                    Self::add_task_timing(timing);
116
117                                                    runnable.run();
118                                                    timing
119                                                }
120                                                RunnableVariant::Compat(runnable) => {
121                                                    let timing = TaskTiming {
122                                                        location: core::panic::Location::caller(),
123                                                        start,
124                                                        end: None,
125                                                    };
126                                                    Self::add_task_timing(timing);
127
128                                                    runnable.run();
129                                                    timing
130                                                }
131                                            };
132                                            let end = Instant::now();
133
134                                            timing.end = Some(end);
135                                            Self::add_task_timing(timing);
136                                        }
137                                        TimeoutAction::Drop
138                                    },
139                                )
140                                .expect("Failed to start timer");
141                        }
142                    })
143                    .expect("Failed to start timer thread");
144
145                event_loop.run(None, &mut (), |_| {}).log_err();
146            })
147            .unwrap();
148
149        background_threads.push(timer_thread);
150
151        Self {
152            main_sender,
153            timer_sender,
154            background_sender,
155            _background_threads: background_threads,
156            main_thread_id: thread::current().id(),
157        }
158    }
159
160    pub(crate) fn add_task_timing(timing: TaskTiming) {
161        THREAD_TIMINGS.with(|timings| {
162            let mut timings = timings.lock();
163            let timings = &mut timings.timings;
164
165            if let Some(last_timing) = timings.iter_mut().rev().next() {
166                if last_timing.location == timing.location {
167                    last_timing.end = timing.end;
168                    return;
169                }
170            }
171
172            timings.push_back(timing);
173        });
174    }
175}
176
177impl PlatformDispatcher for LinuxDispatcher {
178    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
179        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
180        ThreadTaskTimings::convert(&global_timings)
181    }
182
183    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
184        THREAD_TIMINGS.with(|timings| {
185            let timings = timings.lock();
186            let timings = &timings.timings;
187
188            let mut vec = Vec::with_capacity(timings.len());
189
190            let (s1, s2) = timings.as_slices();
191            vec.extend_from_slice(s1);
192            vec.extend_from_slice(s2);
193            vec
194        })
195    }
196
197    fn is_main_thread(&self) -> bool {
198        thread::current().id() == self.main_thread_id
199    }
200
201    fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
202        self.background_sender.send(runnable).unwrap();
203    }
204
205    fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
206        self.main_sender.send(runnable).unwrap_or_else(|runnable| {
207            // NOTE: Runnable may wrap a Future that is !Send.
208            //
209            // This is usually safe because we only poll it on the main thread.
210            // However if the send fails, we know that:
211            // 1. main_receiver has been dropped (which implies the app is shutting down)
212            // 2. we are on a background thread.
213            // It is not safe to drop something !Send on the wrong thread, and
214            // the app will exit soon anyway, so we must forget the runnable.
215            std::mem::forget(runnable);
216        });
217    }
218
219    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
220        self.timer_sender
221            .send(TimerAfter { duration, runnable })
222            .ok();
223    }
224}