dispatcher.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4//todo!(linux): remove
  5#![allow(unused_variables)]
  6
  7use crate::{PlatformDispatcher, TaskLabel};
  8use async_task::Runnable;
  9use parking::{Parker, Unparker};
 10use parking_lot::Mutex;
 11use std::{
 12    panic,
 13    sync::Arc,
 14    thread,
 15    time::{Duration, Instant},
 16};
 17use xcb::x;
 18
 19pub(crate) struct LinuxDispatcher {
 20    xcb_connection: Arc<xcb::Connection>,
 21    x_listener_window: x::Window,
 22    parker: Mutex<Parker>,
 23    timed_tasks: Mutex<Vec<(Instant, Runnable)>>,
 24    main_sender: flume::Sender<Runnable>,
 25    background_sender: flume::Sender<Runnable>,
 26    _background_thread: thread::JoinHandle<()>,
 27    main_thread_id: thread::ThreadId,
 28}
 29
 30impl LinuxDispatcher {
 31    pub fn new(
 32        main_sender: flume::Sender<Runnable>,
 33        xcb_connection: &Arc<xcb::Connection>,
 34        x_root_index: i32,
 35    ) -> Self {
 36        let x_listener_window = xcb_connection.generate_id();
 37        let screen = xcb_connection
 38            .get_setup()
 39            .roots()
 40            .nth(x_root_index as usize)
 41            .unwrap();
 42        xcb_connection.send_request(&x::CreateWindow {
 43            depth: 0,
 44            wid: x_listener_window,
 45            parent: screen.root(),
 46            x: 0,
 47            y: 0,
 48            width: 1,
 49            height: 1,
 50            border_width: 0,
 51            class: x::WindowClass::InputOnly,
 52            visual: screen.root_visual(),
 53            value_list: &[],
 54        });
 55
 56        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 57        let background_thread = thread::spawn(move || {
 58            for runnable in background_receiver {
 59                let _ignore_panic = panic::catch_unwind(|| runnable.run());
 60            }
 61        });
 62        LinuxDispatcher {
 63            xcb_connection: Arc::clone(xcb_connection),
 64            x_listener_window,
 65            parker: Mutex::new(Parker::new()),
 66            timed_tasks: Mutex::new(Vec::new()),
 67            main_sender,
 68            background_sender,
 69            _background_thread: background_thread,
 70            main_thread_id: thread::current().id(),
 71        }
 72    }
 73}
 74
 75impl Drop for LinuxDispatcher {
 76    fn drop(&mut self) {
 77        self.xcb_connection.send_request(&x::DestroyWindow {
 78            window: self.x_listener_window,
 79        });
 80    }
 81}
 82
 83impl PlatformDispatcher for LinuxDispatcher {
 84    fn is_main_thread(&self) -> bool {
 85        thread::current().id() == self.main_thread_id
 86    }
 87
 88    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
 89        self.background_sender.send(runnable).unwrap();
 90    }
 91
 92    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 93        self.main_sender.send(runnable).unwrap();
 94        // Send a message to the invisible window, forcing
 95        // the main loop to wake up and dispatch the runnable.
 96        self.xcb_connection.send_request(&x::SendEvent {
 97            propagate: false,
 98            destination: x::SendEventDest::Window(self.x_listener_window),
 99            event_mask: x::EventMask::NO_EVENT,
100            event: &x::VisibilityNotifyEvent::new(
101                self.x_listener_window,
102                x::Visibility::Unobscured,
103            ),
104        });
105        self.xcb_connection.flush().unwrap();
106    }
107
108    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
109        let moment = Instant::now() + duration;
110        let mut timed_tasks = self.timed_tasks.lock();
111        timed_tasks.push((moment, runnable));
112        timed_tasks.sort_unstable_by(|(a, _), (b, _)| b.cmp(a));
113    }
114
115    fn tick(&self, background_only: bool) -> bool {
116        let mut timed_tasks = self.timed_tasks.lock();
117        let old_count = timed_tasks.len();
118        while let Some(&(moment, _)) = timed_tasks.last() {
119            if moment <= Instant::now() {
120                let (_, runnable) = timed_tasks.pop().unwrap();
121                runnable.run();
122            } else {
123                break;
124            }
125        }
126        timed_tasks.len() != old_count
127    }
128
129    fn park(&self) {
130        self.parker.lock().park()
131    }
132
133    fn unparker(&self) -> Unparker {
134        self.parker.lock().unparker()
135    }
136}