dispatcher.rs

  1use crate::{PlatformDispatcher, TaskLabel};
  2use async_task::Runnable;
  3use calloop::{
  4    channel::{self, Sender},
  5    timer::TimeoutAction,
  6    EventLoop,
  7};
  8use parking::{Parker, Unparker};
  9use parking_lot::Mutex;
 10use std::{thread, time::Duration};
 11use util::ResultExt;
 12
 13struct TimerAfter {
 14    duration: Duration,
 15    runnable: Runnable,
 16}
 17
 18pub(crate) struct LinuxDispatcher {
 19    parker: Mutex<Parker>,
 20    main_sender: Sender<Runnable>,
 21    timer_sender: Sender<TimerAfter>,
 22    background_sender: flume::Sender<Runnable>,
 23    _background_threads: Vec<thread::JoinHandle<()>>,
 24    main_thread_id: thread::ThreadId,
 25}
 26
 27impl LinuxDispatcher {
 28    pub fn new(main_sender: Sender<Runnable>) -> Self {
 29        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 30        let thread_count = std::thread::available_parallelism()
 31            .map(|i| i.get())
 32            .unwrap_or(1);
 33
 34        let mut background_threads = (0..thread_count)
 35            .map(|_| {
 36                let receiver = background_receiver.clone();
 37                std::thread::spawn(move || {
 38                    for runnable in receiver {
 39                        runnable.run();
 40                    }
 41                })
 42            })
 43            .collect::<Vec<_>>();
 44
 45        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 46        let timer_thread = std::thread::spawn(|| {
 47            let mut event_loop: EventLoop<()> =
 48                EventLoop::try_new().expect("Failed to initialize timer loop!");
 49
 50            let handle = event_loop.handle();
 51            let timer_handle = event_loop.handle();
 52            handle
 53                .insert_source(timer_channel, move |e, _, _| {
 54                    if let channel::Event::Msg(timer) = e {
 55                        // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 56                        let mut runnable = Some(timer.runnable);
 57                        timer_handle
 58                            .insert_source(
 59                                calloop::timer::Timer::from_duration(timer.duration),
 60                                move |_, _, _| {
 61                                    if let Some(runnable) = runnable.take() {
 62                                        runnable.run();
 63                                    }
 64                                    TimeoutAction::Drop
 65                                },
 66                            )
 67                            .expect("Failed to start timer");
 68                    }
 69                })
 70                .expect("Failed to start timer thread");
 71
 72            event_loop.run(None, &mut (), |_| {}).log_err();
 73        });
 74
 75        background_threads.push(timer_thread);
 76
 77        Self {
 78            parker: Mutex::new(Parker::new()),
 79            main_sender,
 80            timer_sender,
 81            background_sender,
 82            _background_threads: background_threads,
 83            main_thread_id: thread::current().id(),
 84        }
 85    }
 86}
 87
 88impl PlatformDispatcher for LinuxDispatcher {
 89    fn is_main_thread(&self) -> bool {
 90        thread::current().id() == self.main_thread_id
 91    }
 92
 93    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
 94        self.background_sender.send(runnable).unwrap();
 95    }
 96
 97    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 98        self.main_sender.send(runnable).ok();
 99    }
100
101    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
102        self.timer_sender
103            .send(TimerAfter { duration, runnable })
104            .ok();
105    }
106
107    fn park(&self, timeout: Option<Duration>) -> bool {
108        if let Some(timeout) = timeout {
109            self.parker.lock().park_timeout(timeout)
110        } else {
111            self.parker.lock().park();
112            true
113        }
114    }
115
116    fn unparker(&self) -> Unparker {
117        self.parker.lock().unparker()
118    }
119}