1use calloop::{
2 EventLoop, PostAction,
3 channel::{self, Sender},
4 timer::TimeoutAction,
5};
6use util::ResultExt;
7
8use std::{
9 mem::MaybeUninit,
10 thread,
11 time::{Duration, Instant},
12};
13
14use gpui::{
15 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
16 PriorityQueueSender, RunnableVariant, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings, profiler,
17};
18
19struct TimerAfter {
20 duration: Duration,
21 runnable: RunnableVariant,
22}
23
24pub(crate) struct LinuxDispatcher {
25 main_sender: PriorityQueueCalloopSender<RunnableVariant>,
26 timer_sender: Sender<TimerAfter>,
27 background_sender: PriorityQueueSender<RunnableVariant>,
28 _background_threads: Vec<thread::JoinHandle<()>>,
29 main_thread_id: thread::ThreadId,
30}
31
32const MIN_THREADS: usize = 2;
33
34impl LinuxDispatcher {
35 pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
36 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
37 let thread_count =
38 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
39
40 let mut background_threads = (0..thread_count)
41 .map(|i| {
42 let receiver: PriorityQueueReceiver<RunnableVariant> = background_receiver.clone();
43 std::thread::Builder::new()
44 .name(format!("Worker-{i}"))
45 .spawn(move || {
46 for runnable in receiver.iter() {
47 let start = Instant::now();
48
49 let location = runnable.metadata().location;
50 let mut timing = TaskTiming {
51 location,
52 start,
53 end: None,
54 };
55 profiler::add_task_timing(timing);
56
57 runnable.run();
58
59 let end = Instant::now();
60 timing.end = Some(end);
61 profiler::add_task_timing(timing);
62
63 log::trace!(
64 "background thread {}: ran runnable. took: {:?}",
65 i,
66 start.elapsed()
67 );
68 }
69 })
70 .unwrap()
71 })
72 .collect::<Vec<_>>();
73
74 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
75 let timer_thread = std::thread::Builder::new()
76 .name("Timer".to_owned())
77 .spawn(move || {
78 let mut event_loop: EventLoop<()> =
79 EventLoop::try_new().expect("Failed to initialize timer loop!");
80
81 let handle = event_loop.handle();
82 let timer_handle = event_loop.handle();
83 handle
84 .insert_source(timer_channel, move |e, _, _| {
85 if let channel::Event::Msg(timer) = e {
86 let mut runnable = Some(timer.runnable);
87 timer_handle
88 .insert_source(
89 calloop::timer::Timer::from_duration(timer.duration),
90 move |_, _, _| {
91 if let Some(runnable) = runnable.take() {
92 let start = Instant::now();
93 let location = runnable.metadata().location;
94 let mut timing = TaskTiming {
95 location,
96 start,
97 end: None,
98 };
99 profiler::add_task_timing(timing);
100
101 runnable.run();
102 let end = Instant::now();
103
104 timing.end = Some(end);
105 profiler::add_task_timing(timing);
106 }
107 TimeoutAction::Drop
108 },
109 )
110 .expect("Failed to start timer");
111 }
112 })
113 .expect("Failed to start timer thread");
114
115 event_loop.run(None, &mut (), |_| {}).log_err();
116 })
117 .unwrap();
118
119 background_threads.push(timer_thread);
120
121 Self {
122 main_sender,
123 timer_sender,
124 background_sender,
125 _background_threads: background_threads,
126 main_thread_id: thread::current().id(),
127 }
128 }
129}
130
131impl PlatformDispatcher for LinuxDispatcher {
132 fn get_all_timings(&self) -> Vec<gpui::ThreadTaskTimings> {
133 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
134 ThreadTaskTimings::convert(&global_timings)
135 }
136
137 fn get_current_thread_timings(&self) -> gpui::ThreadTaskTimings {
138 THREAD_TIMINGS.with(|timings| {
139 let timings = timings.lock();
140 let thread_name = timings.thread_name.clone();
141 let total_pushed = timings.total_pushed;
142 let timings = &timings.timings;
143
144 let mut vec = Vec::with_capacity(timings.len());
145
146 let (s1, s2) = timings.as_slices();
147 vec.extend_from_slice(s1);
148 vec.extend_from_slice(s2);
149
150 gpui::ThreadTaskTimings {
151 thread_name,
152 thread_id: std::thread::current().id(),
153 timings: vec,
154 total_pushed,
155 }
156 })
157 }
158
159 fn is_main_thread(&self) -> bool {
160 thread::current().id() == self.main_thread_id
161 }
162
163 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
164 self.background_sender
165 .send(priority, runnable)
166 .unwrap_or_else(|_| panic!("blocking sender returned without value"));
167 }
168
169 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
170 self.main_sender
171 .send(priority, runnable)
172 .unwrap_or_else(|runnable| {
173 // NOTE: Runnable may wrap a Future that is !Send.
174 //
175 // This is usually safe because we only poll it on the main thread.
176 // However if the send fails, we know that:
177 // 1. main_receiver has been dropped (which implies the app is shutting down)
178 // 2. we are on a background thread.
179 // It is not safe to drop something !Send on the wrong thread, and
180 // the app will exit soon anyway, so we must forget the runnable.
181 std::mem::forget(runnable);
182 });
183 }
184
185 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
186 self.timer_sender
187 .send(TimerAfter { duration, runnable })
188 .ok();
189 }
190
191 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
192 std::thread::spawn(move || {
193 // SAFETY: always safe to call
194 let thread_id = unsafe { libc::pthread_self() };
195
196 let policy = libc::SCHED_FIFO;
197 let sched_priority = 65;
198
199 // SAFETY: all sched_param members are valid when initialized to zero.
200 let mut sched_param =
201 unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
202 sched_param.sched_priority = sched_priority;
203 // SAFETY: sched_param is a valid initialized structure
204 let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
205 if result != 0 {
206 log::warn!("failed to set realtime thread priority");
207 }
208
209 f();
210 });
211 }
212}
213
214pub struct PriorityQueueCalloopSender<T> {
215 sender: PriorityQueueSender<T>,
216 ping: calloop::ping::Ping,
217}
218
219impl<T> PriorityQueueCalloopSender<T> {
220 fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
221 Self { sender: tx, ping }
222 }
223
224 fn send(&self, priority: Priority, item: T) -> Result<(), gpui::queue::SendError<T>> {
225 let res = self.sender.send(priority, item);
226 if res.is_ok() {
227 self.ping.ping();
228 }
229 res
230 }
231}
232
233impl<T> Drop for PriorityQueueCalloopSender<T> {
234 fn drop(&mut self) {
235 self.ping.ping();
236 }
237}
238
239pub struct PriorityQueueCalloopReceiver<T> {
240 receiver: PriorityQueueReceiver<T>,
241 source: calloop::ping::PingSource,
242 ping: calloop::ping::Ping,
243}
244
245impl<T> PriorityQueueCalloopReceiver<T> {
246 pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
247 let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
248
249 let (tx, rx) = PriorityQueueReceiver::new();
250
251 (
252 PriorityQueueCalloopSender::new(tx, ping.clone()),
253 Self {
254 receiver: rx,
255 source,
256 ping,
257 },
258 )
259 }
260}
261
262use calloop::channel::Event;
263
264#[derive(Debug)]
265pub struct ChannelError(calloop::ping::PingError);
266
267impl std::fmt::Display for ChannelError {
268 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 std::fmt::Display::fmt(&self.0, f)
271 }
272}
273
274impl std::error::Error for ChannelError {
275 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
276 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
277 Some(&self.0)
278 }
279}
280
281impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
282 type Event = Event<T>;
283 type Metadata = ();
284 type Ret = ();
285 type Error = ChannelError;
286
287 fn process_events<F>(
288 &mut self,
289 readiness: calloop::Readiness,
290 token: calloop::Token,
291 mut callback: F,
292 ) -> Result<calloop::PostAction, Self::Error>
293 where
294 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
295 {
296 let mut clear_readiness = false;
297 let mut disconnected = false;
298
299 let action = self
300 .source
301 .process_events(readiness, token, |(), &mut ()| {
302 let mut is_empty = true;
303
304 let receiver = self.receiver.clone();
305 for runnable in receiver.try_iter() {
306 match runnable {
307 Ok(r) => {
308 callback(Event::Msg(r), &mut ());
309 is_empty = false;
310 }
311 Err(_) => {
312 disconnected = true;
313 }
314 }
315 }
316
317 if disconnected {
318 callback(Event::Closed, &mut ());
319 }
320
321 if is_empty {
322 clear_readiness = true;
323 }
324 })
325 .map_err(ChannelError)?;
326
327 if disconnected {
328 Ok(PostAction::Remove)
329 } else if clear_readiness {
330 Ok(action)
331 } else {
332 // Re-notify the ping source so we can try again.
333 self.ping.ping();
334 Ok(PostAction::Continue)
335 }
336 }
337
338 fn register(
339 &mut self,
340 poll: &mut calloop::Poll,
341 token_factory: &mut calloop::TokenFactory,
342 ) -> calloop::Result<()> {
343 self.source.register(poll, token_factory)
344 }
345
346 fn reregister(
347 &mut self,
348 poll: &mut calloop::Poll,
349 token_factory: &mut calloop::TokenFactory,
350 ) -> calloop::Result<()> {
351 self.source.reregister(poll, token_factory)
352 }
353
354 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
355 self.source.unregister(poll)
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn calloop_works() {
365 let mut event_loop = calloop::EventLoop::try_new().unwrap();
366 let handle = event_loop.handle();
367
368 let (tx, rx) = PriorityQueueCalloopReceiver::new();
369
370 struct Data {
371 got_msg: bool,
372 got_closed: bool,
373 }
374
375 let mut data = Data {
376 got_msg: false,
377 got_closed: false,
378 };
379
380 let _channel_token = handle
381 .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
382 Event::Msg(()) => {
383 data.got_msg = true;
384 }
385
386 Event::Closed => {
387 data.got_closed = true;
388 }
389 })
390 .unwrap();
391
392 // nothing is sent, nothing is received
393 event_loop
394 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
395 .unwrap();
396
397 assert!(!data.got_msg);
398 assert!(!data.got_closed);
399 // a message is send
400
401 tx.send(Priority::Medium, ()).unwrap();
402 event_loop
403 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
404 .unwrap();
405
406 assert!(data.got_msg);
407 assert!(!data.got_closed);
408
409 // the sender is dropped
410 drop(tx);
411 event_loop
412 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
413 .unwrap();
414
415 assert!(data.got_msg);
416 assert!(data.got_closed);
417 }
418}
419
420// running 1 test
421// test linux::dispatcher::tests::tomato ... FAILED
422
423// failures:
424
425// ---- linux::dispatcher::tests::tomato stdout ----
426// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
427// returning 1 tasks to process
428// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
429// (),
430// )
431// returning 0 tasks to process
432
433// thread 'linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
434// assertion failed: data.got_closed
435// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace