dispatcher.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4
  5use crate::{PlatformDispatcher, TaskLabel};
  6use async_task::Runnable;
  7use parking::{Parker, Unparker};
  8use parking_lot::Mutex;
  9use std::{
 10    panic,
 11    sync::Arc,
 12    thread,
 13    time::{Duration, Instant},
 14};
 15use xcb::x;
 16
 17pub(crate) struct LinuxDispatcher {
 18    xcb_connection: Arc<xcb::Connection>,
 19    x_listener_window: x::Window,
 20    parker: Mutex<Parker>,
 21    timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
 22    main_sender: flume::Sender<Runnable>,
 23    background_sender: flume::Sender<Runnable>,
 24    _background_thread: thread::JoinHandle<()>,
 25    main_thread_id: thread::ThreadId,
 26}
 27
 28impl LinuxDispatcher {
 29    pub fn new(
 30        main_sender: flume::Sender<Runnable>,
 31        xcb_connection: &Arc<xcb::Connection>,
 32        x_root_index: i32,
 33    ) -> Self {
 34        let x_listener_window = xcb_connection.generate_id();
 35        let screen = xcb_connection
 36            .get_setup()
 37            .roots()
 38            .nth(x_root_index as usize)
 39            .unwrap();
 40        xcb_connection.send_request(&x::CreateWindow {
 41            depth: 0,
 42            wid: x_listener_window,
 43            parent: screen.root(),
 44            x: 0,
 45            y: 0,
 46            width: 1,
 47            height: 1,
 48            border_width: 0,
 49            class: x::WindowClass::InputOnly,
 50            visual: screen.root_visual(),
 51            value_list: &[],
 52        });
 53
 54        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 55        let background_thread = thread::spawn(move || {
 56            for runnable in background_receiver {
 57                let _ignore_panic = panic::catch_unwind(|| runnable.run());
 58            }
 59        });
 60        LinuxDispatcher {
 61            xcb_connection: Arc::clone(xcb_connection),
 62            x_listener_window,
 63            parker: Mutex::new(Parker::new()),
 64            timed_tasks: Mutex::new(Vec::new()),
 65            main_sender,
 66            background_sender,
 67            _background_thread: background_thread,
 68            main_thread_id: thread::current().id(),
 69        }
 70    }
 71}
 72
 73impl Drop for LinuxDispatcher {
 74    fn drop(&mut self) {
 75        self.xcb_connection.send_request(&x::DestroyWindow {
 76            window: self.x_listener_window,
 77        });
 78    }
 79}
 80
 81impl PlatformDispatcher for LinuxDispatcher {
 82    fn is_main_thread(&self) -> bool {
 83        thread::current().id() == self.main_thread_id
 84    }
 85
 86    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
 87        self.background_sender.send(runnable).unwrap();
 88    }
 89
 90    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 91        self.main_sender.send(runnable).unwrap();
 92        // Send a message to the invisible window, forcing
 93        // the main loop to wake up and dispatch the runnable.
 94        self.xcb_connection.send_request(&x::SendEvent {
 95            propagate: false,
 96            destination: x::SendEventDest::Window(self.x_listener_window),
 97            event_mask: x::EventMask::NO_EVENT,
 98            event: &x::VisibilityNotifyEvent::new(
 99                self.x_listener_window,
100                x::Visibility::Unobscured,
101            ),
102        });
103        self.xcb_connection.flush().unwrap();
104    }
105
106    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
107        let moment = Instant::now() + duration;
108        let mut timed_tasks = self.timed_tasks.lock();
109        timed_tasks.push((moment, runnable));
110        timed_tasks.sort_unstable_by(|&(ref a, _), &(ref b, _)| b.cmp(a));
111    }
112
113    fn tick(&self, background_only: bool) -> bool {
114        let mut timed_tasks = self.timed_tasks.lock();
115        let old_count = timed_tasks.len();
116        while let Some(&(moment, _)) = timed_tasks.last() {
117            if moment <= Instant::now() {
118                let (_, runnable) = timed_tasks.pop().unwrap();
119                runnable.run();
120            } else {
121                break;
122            }
123        }
124        timed_tasks.len() != old_count
125    }
126
127    fn park(&self) {
128        self.parker.lock().park()
129    }
130
131    fn unparker(&self) -> Unparker {
132        self.parker.lock().unparker()
133    }
134}