dispatcher.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4// todo(linux): remove
  5#![allow(unused_variables)]
  6
  7use crate::{PlatformDispatcher, TaskLabel};
  8use async_task::Runnable;
  9use calloop::{
 10    channel::{self, Sender},
 11    timer::TimeoutAction,
 12    EventLoop,
 13};
 14use parking::{Parker, Unparker};
 15use parking_lot::Mutex;
 16use std::{thread, time::Duration};
 17use util::ResultExt;
 18
 19struct TimerAfter {
 20    duration: Duration,
 21    runnable: Runnable,
 22}
 23
 24pub(crate) struct LinuxDispatcher {
 25    parker: Mutex<Parker>,
 26    main_sender: Sender<Runnable>,
 27    timer_sender: Sender<TimerAfter>,
 28    background_sender: flume::Sender<Runnable>,
 29    _background_threads: Vec<thread::JoinHandle<()>>,
 30    main_thread_id: thread::ThreadId,
 31}
 32
 33impl LinuxDispatcher {
 34    pub fn new(main_sender: Sender<Runnable>) -> Self {
 35        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 36        let thread_count = std::thread::available_parallelism()
 37            .map(|i| i.get())
 38            .unwrap_or(1);
 39
 40        let mut background_threads = (0..thread_count)
 41            .map(|_| {
 42                let receiver = background_receiver.clone();
 43                std::thread::spawn(move || {
 44                    for runnable in receiver {
 45                        runnable.run();
 46                    }
 47                })
 48            })
 49            .collect::<Vec<_>>();
 50
 51        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 52        let timer_thread = std::thread::spawn(|| {
 53            let mut event_loop: EventLoop<()> =
 54                EventLoop::try_new().expect("Failed to initialize timer loop!");
 55
 56            let handle = event_loop.handle();
 57            let timer_handle = event_loop.handle();
 58            handle
 59                .insert_source(timer_channel, move |e, _, _| {
 60                    if let channel::Event::Msg(timer) = e {
 61                        // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 62                        let mut runnable = Some(timer.runnable);
 63                        timer_handle
 64                            .insert_source(
 65                                calloop::timer::Timer::from_duration(timer.duration),
 66                                move |e, _, _| {
 67                                    if let Some(runnable) = runnable.take() {
 68                                        runnable.run();
 69                                    }
 70                                    TimeoutAction::Drop
 71                                },
 72                            )
 73                            .expect("Failed to start timer");
 74                    }
 75                })
 76                .expect("Failed to start timer thread");
 77
 78            event_loop.run(None, &mut (), |_| {}).log_err();
 79        });
 80
 81        background_threads.push(timer_thread);
 82
 83        Self {
 84            parker: Mutex::new(Parker::new()),
 85            main_sender,
 86            timer_sender,
 87            background_sender,
 88            _background_threads: background_threads,
 89            main_thread_id: thread::current().id(),
 90        }
 91    }
 92}
 93
 94impl PlatformDispatcher for LinuxDispatcher {
 95    fn is_main_thread(&self) -> bool {
 96        thread::current().id() == self.main_thread_id
 97    }
 98
 99    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
100        self.background_sender.send(runnable).unwrap();
101    }
102
103    fn dispatch_on_main_thread(&self, runnable: Runnable) {
104        self.main_sender.send(runnable).ok();
105    }
106
107    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
108        self.timer_sender
109            .send(TimerAfter { duration, runnable })
110            .ok();
111    }
112
113    fn park(&self, timeout: Option<Duration>) -> bool {
114        if let Some(timeout) = timeout {
115            self.parker.lock().park_timeout(timeout)
116        } else {
117            self.parker.lock().park();
118            true
119        }
120    }
121
122    fn unparker(&self) -> Unparker {
123        self.parker.lock().unparker()
124    }
125}