dispatcher.rs

  1use crate::{PlatformDispatcher, TaskLabel};
  2use async_task::Runnable;
  3use calloop::{
  4    EventLoop,
  5    channel::{self, Sender},
  6    timer::TimeoutAction,
  7};
  8use parking::{Parker, Unparker};
  9use parking_lot::Mutex;
 10use std::{
 11    thread,
 12    time::{Duration, Instant},
 13};
 14use util::ResultExt;
 15
 16struct TimerAfter {
 17    duration: Duration,
 18    runnable: Runnable,
 19}
 20
 21pub(crate) struct LinuxDispatcher {
 22    parker: Mutex<Parker>,
 23    main_sender: Sender<Runnable>,
 24    timer_sender: Sender<TimerAfter>,
 25    background_sender: flume::Sender<Runnable>,
 26    _background_threads: Vec<thread::JoinHandle<()>>,
 27    main_thread_id: thread::ThreadId,
 28}
 29
 30impl LinuxDispatcher {
 31    pub fn new(main_sender: Sender<Runnable>) -> Self {
 32        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 33        let thread_count = std::thread::available_parallelism()
 34            .map(|i| i.get())
 35            .unwrap_or(1);
 36
 37        let mut background_threads = (0..thread_count)
 38            .map(|i| {
 39                let receiver = background_receiver.clone();
 40                std::thread::spawn(move || {
 41                    for runnable in receiver {
 42                        let start = Instant::now();
 43
 44                        runnable.run();
 45
 46                        log::trace!(
 47                            "background thread {}: ran runnable. took: {:?}",
 48                            i,
 49                            start.elapsed()
 50                        );
 51                    }
 52                })
 53            })
 54            .collect::<Vec<_>>();
 55
 56        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 57        let timer_thread = std::thread::spawn(|| {
 58            let mut event_loop: EventLoop<()> =
 59                EventLoop::try_new().expect("Failed to initialize timer loop!");
 60
 61            let handle = event_loop.handle();
 62            let timer_handle = event_loop.handle();
 63            handle
 64                .insert_source(timer_channel, move |e, _, _| {
 65                    if let channel::Event::Msg(timer) = e {
 66                        // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 67                        let mut runnable = Some(timer.runnable);
 68                        timer_handle
 69                            .insert_source(
 70                                calloop::timer::Timer::from_duration(timer.duration),
 71                                move |_, _, _| {
 72                                    if let Some(runnable) = runnable.take() {
 73                                        runnable.run();
 74                                    }
 75                                    TimeoutAction::Drop
 76                                },
 77                            )
 78                            .expect("Failed to start timer");
 79                    }
 80                })
 81                .expect("Failed to start timer thread");
 82
 83            event_loop.run(None, &mut (), |_| {}).log_err();
 84        });
 85
 86        background_threads.push(timer_thread);
 87
 88        Self {
 89            parker: Mutex::new(Parker::new()),
 90            main_sender,
 91            timer_sender,
 92            background_sender,
 93            _background_threads: background_threads,
 94            main_thread_id: thread::current().id(),
 95        }
 96    }
 97}
 98
 99impl PlatformDispatcher for LinuxDispatcher {
100    fn is_main_thread(&self) -> bool {
101        thread::current().id() == self.main_thread_id
102    }
103
104    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
105        self.background_sender.send(runnable).unwrap();
106    }
107
108    fn dispatch_on_main_thread(&self, runnable: Runnable) {
109        self.main_sender.send(runnable).ok();
110    }
111
112    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
113        self.timer_sender
114            .send(TimerAfter { duration, runnable })
115            .ok();
116    }
117
118    fn park(&self, timeout: Option<Duration>) -> bool {
119        if let Some(timeout) = timeout {
120            self.parker.lock().park_timeout(timeout)
121        } else {
122            self.parker.lock().park();
123            true
124        }
125    }
126
127    fn unparker(&self) -> Unparker {
128        self.parker.lock().unparker()
129    }
130}