dispatcher.rs

  1use std::{
  2    thread::{current, ThreadId},
  3    time::Duration,
  4};
  5
  6use anyhow::Context;
  7use async_task::Runnable;
  8use flume::Sender;
  9use parking::Parker;
 10use parking_lot::Mutex;
 11use util::ResultExt;
 12use windows::{
 13    Foundation::TimeSpan,
 14    System::Threading::{
 15        ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
 16        WorkItemPriority,
 17    },
 18    Win32::{Foundation::HANDLE, System::Threading::SetEvent},
 19};
 20
 21use crate::{PlatformDispatcher, SafeHandle, TaskLabel};
 22
 23pub(crate) struct WindowsDispatcher {
 24    main_sender: Sender<Runnable>,
 25    dispatch_event: SafeHandle,
 26    parker: Mutex<Parker>,
 27    main_thread_id: ThreadId,
 28}
 29
 30impl WindowsDispatcher {
 31    pub(crate) fn new(main_sender: Sender<Runnable>, dispatch_event: HANDLE) -> Self {
 32        let dispatch_event = dispatch_event.into();
 33        let parker = Mutex::new(Parker::new());
 34        let main_thread_id = current().id();
 35
 36        WindowsDispatcher {
 37            main_sender,
 38            dispatch_event,
 39            parker,
 40            main_thread_id,
 41        }
 42    }
 43
 44    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 45        let handler = {
 46            let mut task_wrapper = Some(runnable);
 47            WorkItemHandler::new(move |_| {
 48                task_wrapper.take().unwrap().run();
 49                Ok(())
 50            })
 51        };
 52        ThreadPool::RunWithPriorityAndOptionsAsync(
 53            &handler,
 54            WorkItemPriority::High,
 55            WorkItemOptions::TimeSliced,
 56        )
 57        .log_err();
 58    }
 59
 60    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 61        let handler = {
 62            let mut task_wrapper = Some(runnable);
 63            TimerElapsedHandler::new(move |_| {
 64                task_wrapper.take().unwrap().run();
 65                Ok(())
 66            })
 67        };
 68        let delay = TimeSpan {
 69            // A time period expressed in 100-nanosecond units.
 70            // 10,000,000 ticks per second
 71            Duration: (duration.as_nanos() / 100) as i64,
 72        };
 73        ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
 74    }
 75}
 76
 77impl PlatformDispatcher for WindowsDispatcher {
 78    fn is_main_thread(&self) -> bool {
 79        current().id() == self.main_thread_id
 80    }
 81
 82    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
 83        self.dispatch_on_threadpool(runnable);
 84        if let Some(label) = label {
 85            log::debug!("TaskLabel: {label:?}");
 86        }
 87    }
 88
 89    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 90        self.main_sender
 91            .send(runnable)
 92            .context("Dispatch on main thread failed")
 93            .log_err();
 94        unsafe { SetEvent(*self.dispatch_event).log_err() };
 95    }
 96
 97    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
 98        self.dispatch_on_threadpool_after(runnable, duration);
 99    }
100
101    fn park(&self, timeout: Option<Duration>) -> bool {
102        if let Some(timeout) = timeout {
103            self.parker.lock().park_timeout(timeout)
104        } else {
105            self.parker.lock().park();
106            true
107        }
108    }
109
110    fn unparker(&self) -> parking::Unparker {
111        self.parker.lock().unparker()
112    }
113}