1#![cfg(target_os = "macos")]
2
3use bitflags::bitflags;
4use fsevent_sys::{self as fs, core_foundation as cf};
5use parking_lot::Mutex;
6use std::{
7 convert::AsRef,
8 ffi::{CStr, OsStr, c_void},
9 os::unix::ffi::OsStrExt,
10 path::{Path, PathBuf},
11 ptr, slice,
12 sync::Arc,
13 time::Duration,
14};
15
16#[derive(Clone, Debug)]
17pub struct Event {
18 pub event_id: u64,
19 pub flags: StreamFlags,
20 pub path: PathBuf,
21}
22
23pub struct EventStream {
24 lifecycle: Arc<Mutex<Lifecycle>>,
25 state: Box<State>,
26}
27
28struct State {
29 latency: Duration,
30 paths: cf::CFMutableArrayRef,
31 callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
32 last_valid_event_id: Option<fs::FSEventStreamEventId>,
33 stream: fs::FSEventStreamRef,
34}
35
36impl Drop for State {
37 fn drop(&mut self) {
38 unsafe {
39 cf::CFRelease(self.paths);
40 fs::FSEventStreamStop(self.stream);
41 fs::FSEventStreamInvalidate(self.stream);
42 fs::FSEventStreamRelease(self.stream);
43 }
44 }
45}
46
47enum Lifecycle {
48 New,
49 Running(cf::CFRunLoopRef),
50 Stopped,
51}
52
53pub struct Handle(Arc<Mutex<Lifecycle>>);
54
55unsafe impl Send for EventStream {}
56unsafe impl Send for Lifecycle {}
57
58impl EventStream {
59 pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
60 unsafe {
61 let cf_paths =
62 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
63 assert!(!cf_paths.is_null());
64
65 for path in paths {
66 let path_bytes = path.as_os_str().as_bytes();
67 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
68 cf::kCFAllocatorDefault,
69 path_bytes.as_ptr() as *const i8,
70 path_bytes.len() as cf::CFIndex,
71 false,
72 );
73 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
74 cf::CFArrayAppendValue(cf_paths, cf_path);
75 cf::CFRelease(cf_path);
76 cf::CFRelease(cf_url);
77 }
78
79 let mut state = Box::new(State {
80 latency,
81 paths: cf_paths,
82 callback: None,
83 last_valid_event_id: None,
84 stream: ptr::null_mut(),
85 });
86 let stream_context = fs::FSEventStreamContext {
87 version: 0,
88 info: state.as_ref() as *const _ as *mut c_void,
89 retain: None,
90 release: None,
91 copy_description: None,
92 };
93 let stream = fs::FSEventStreamCreate(
94 cf::kCFAllocatorDefault,
95 Self::trampoline,
96 &stream_context,
97 cf_paths,
98 FSEventsGetCurrentEventId(),
99 latency.as_secs_f64(),
100 fs::kFSEventStreamCreateFlagFileEvents
101 | fs::kFSEventStreamCreateFlagNoDefer
102 | fs::kFSEventStreamCreateFlagWatchRoot,
103 );
104 state.stream = stream;
105
106 let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
107 (
108 EventStream {
109 lifecycle: lifecycle.clone(),
110 state,
111 },
112 Handle(lifecycle),
113 )
114 }
115 }
116
117 pub fn run<F>(mut self, f: F)
118 where
119 F: FnMut(Vec<Event>) -> bool + 'static,
120 {
121 self.state.callback = Some(Box::new(f));
122 unsafe {
123 let run_loop =
124 core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
125 {
126 let mut state = self.lifecycle.lock();
127 match *state {
128 Lifecycle::New => *state = Lifecycle::Running(run_loop),
129 Lifecycle::Running(_) => unreachable!(),
130 Lifecycle::Stopped => return,
131 }
132 }
133 fs::FSEventStreamScheduleWithRunLoop(
134 self.state.stream,
135 run_loop,
136 cf::kCFRunLoopDefaultMode,
137 );
138 fs::FSEventStreamStart(self.state.stream);
139 cf::CFRunLoopRun();
140 }
141 }
142
143 extern "C" fn trampoline(
144 stream_ref: fs::FSEventStreamRef,
145 info: *mut ::std::os::raw::c_void,
146 num: usize, // size_t numEvents
147 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
148 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
149 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
150 ) {
151 unsafe {
152 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
153 let e_ptr = event_flags as *mut u32;
154 let i_ptr = event_ids as *mut u64;
155 let state = (info as *mut State).as_mut().unwrap();
156 let callback = if let Some(callback) = state.callback.as_mut() {
157 callback
158 } else {
159 return;
160 };
161
162 let paths = slice::from_raw_parts(event_paths, num);
163 let flags = slice::from_raw_parts_mut(e_ptr, num);
164 let ids = slice::from_raw_parts_mut(i_ptr, num);
165 let mut stream_restarted = false;
166
167 // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
168 // or our code couldn't keep up with the sheer volume of file-system events that were
169 // generated. If we observed a valid event before this happens, we'll try to read the
170 // file-system journal by stopping the current stream and creating a new one starting at
171 // such event. Otherwise, we'll let invoke the callback with the dropped event, which
172 // will likely perform a re-scan of one of the root directories.
173 if flags
174 .iter()
175 .copied()
176 .filter_map(StreamFlags::from_bits)
177 .any(|flags| {
178 flags.contains(StreamFlags::USER_DROPPED)
179 || flags.contains(StreamFlags::KERNEL_DROPPED)
180 })
181 && let Some(last_valid_event_id) = state.last_valid_event_id.take()
182 {
183 fs::FSEventStreamStop(state.stream);
184 fs::FSEventStreamInvalidate(state.stream);
185 fs::FSEventStreamRelease(state.stream);
186
187 let stream_context = fs::FSEventStreamContext {
188 version: 0,
189 info,
190 retain: None,
191 release: None,
192 copy_description: None,
193 };
194 let stream = fs::FSEventStreamCreate(
195 cf::kCFAllocatorDefault,
196 Self::trampoline,
197 &stream_context,
198 state.paths,
199 last_valid_event_id,
200 state.latency.as_secs_f64(),
201 fs::kFSEventStreamCreateFlagFileEvents
202 | fs::kFSEventStreamCreateFlagNoDefer
203 | fs::kFSEventStreamCreateFlagWatchRoot,
204 );
205
206 state.stream = stream;
207 fs::FSEventStreamScheduleWithRunLoop(
208 state.stream,
209 cf::CFRunLoopGetCurrent(),
210 cf::kCFRunLoopDefaultMode,
211 );
212 fs::FSEventStreamStart(state.stream);
213 stream_restarted = true;
214 }
215
216 if !stream_restarted {
217 let mut events = Vec::with_capacity(num);
218 for p in 0..num {
219 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
220 if !flag.contains(StreamFlags::HISTORY_DONE) {
221 let path_c_str = CStr::from_ptr(paths[p]);
222 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
223 let event = Event {
224 event_id: ids[p],
225 flags: flag,
226 path,
227 };
228 state.last_valid_event_id = Some(event.event_id);
229 events.push(event);
230 }
231 } else {
232 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
233 }
234 }
235
236 if !events.is_empty() && !callback(events) {
237 fs::FSEventStreamStop(stream_ref);
238 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
239 }
240 }
241 }
242 }
243}
244
245impl Drop for Handle {
246 fn drop(&mut self) {
247 let mut state = self.0.lock();
248 if let Lifecycle::Running(run_loop) = *state {
249 unsafe {
250 cf::CFRunLoopStop(run_loop);
251 cf::CFRelease(run_loop)
252 }
253 }
254 *state = Lifecycle::Stopped;
255 }
256}
257
258// Synchronize with
259// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
260bitflags! {
261 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
262 #[repr(C)]
263 pub struct StreamFlags: u32 {
264 const NONE = 0x00000000;
265 const MUST_SCAN_SUBDIRS = 0x00000001;
266 const USER_DROPPED = 0x00000002;
267 const KERNEL_DROPPED = 0x00000004;
268 const IDS_WRAPPED = 0x00000008;
269 const HISTORY_DONE = 0x00000010;
270 const ROOT_CHANGED = 0x00000020;
271 const MOUNT = 0x00000040;
272 const UNMOUNT = 0x00000080;
273 const ITEM_CREATED = 0x00000100;
274 const ITEM_REMOVED = 0x00000200;
275 const INODE_META_MOD = 0x00000400;
276 const ITEM_RENAMED = 0x00000800;
277 const ITEM_MODIFIED = 0x00001000;
278 const FINDER_INFO_MOD = 0x00002000;
279 const ITEM_CHANGE_OWNER = 0x00004000;
280 const ITEM_XATTR_MOD = 0x00008000;
281 const IS_FILE = 0x00010000;
282 const IS_DIR = 0x00020000;
283 const IS_SYMLINK = 0x00040000;
284 const OWN_EVENT = 0x00080000;
285 const IS_HARDLINK = 0x00100000;
286 const IS_LAST_HARDLINK = 0x00200000;
287 const ITEM_CLONED = 0x400000;
288 }
289}
290
291impl std::fmt::Display for StreamFlags {
292 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
293 if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
294 let _d = write!(f, "MUST_SCAN_SUBDIRS ");
295 }
296 if self.contains(StreamFlags::USER_DROPPED) {
297 let _d = write!(f, "USER_DROPPED ");
298 }
299 if self.contains(StreamFlags::KERNEL_DROPPED) {
300 let _d = write!(f, "KERNEL_DROPPED ");
301 }
302 if self.contains(StreamFlags::IDS_WRAPPED) {
303 let _d = write!(f, "IDS_WRAPPED ");
304 }
305 if self.contains(StreamFlags::HISTORY_DONE) {
306 let _d = write!(f, "HISTORY_DONE ");
307 }
308 if self.contains(StreamFlags::ROOT_CHANGED) {
309 let _d = write!(f, "ROOT_CHANGED ");
310 }
311 if self.contains(StreamFlags::MOUNT) {
312 let _d = write!(f, "MOUNT ");
313 }
314 if self.contains(StreamFlags::UNMOUNT) {
315 let _d = write!(f, "UNMOUNT ");
316 }
317 if self.contains(StreamFlags::ITEM_CREATED) {
318 let _d = write!(f, "ITEM_CREATED ");
319 }
320 if self.contains(StreamFlags::ITEM_REMOVED) {
321 let _d = write!(f, "ITEM_REMOVED ");
322 }
323 if self.contains(StreamFlags::INODE_META_MOD) {
324 let _d = write!(f, "INODE_META_MOD ");
325 }
326 if self.contains(StreamFlags::ITEM_RENAMED) {
327 let _d = write!(f, "ITEM_RENAMED ");
328 }
329 if self.contains(StreamFlags::ITEM_MODIFIED) {
330 let _d = write!(f, "ITEM_MODIFIED ");
331 }
332 if self.contains(StreamFlags::FINDER_INFO_MOD) {
333 let _d = write!(f, "FINDER_INFO_MOD ");
334 }
335 if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
336 let _d = write!(f, "ITEM_CHANGE_OWNER ");
337 }
338 if self.contains(StreamFlags::ITEM_XATTR_MOD) {
339 let _d = write!(f, "ITEM_XATTR_MOD ");
340 }
341 if self.contains(StreamFlags::IS_FILE) {
342 let _d = write!(f, "IS_FILE ");
343 }
344 if self.contains(StreamFlags::IS_DIR) {
345 let _d = write!(f, "IS_DIR ");
346 }
347 if self.contains(StreamFlags::IS_SYMLINK) {
348 let _d = write!(f, "IS_SYMLINK ");
349 }
350 if self.contains(StreamFlags::OWN_EVENT) {
351 let _d = write!(f, "OWN_EVENT ");
352 }
353 if self.contains(StreamFlags::IS_LAST_HARDLINK) {
354 let _d = write!(f, "IS_LAST_HARDLINK ");
355 }
356 if self.contains(StreamFlags::IS_HARDLINK) {
357 let _d = write!(f, "IS_HARDLINK ");
358 }
359 if self.contains(StreamFlags::ITEM_CLONED) {
360 let _d = write!(f, "ITEM_CLONED ");
361 }
362 write!(f, "")
363 }
364}
365
366#[link(name = "CoreServices", kind = "framework")]
367unsafe extern "C" {
368 pub fn FSEventsGetCurrentEventId() -> u64;
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use std::{fs, sync::mpsc, thread, time::Duration};
375
376 #[test]
377 fn test_event_stream_simple() {
378 for _ in 0..3 {
379 let dir = tempfile::Builder::new()
380 .prefix("test-event-stream")
381 .tempdir()
382 .unwrap();
383 let path = dir.path().canonicalize().unwrap();
384 for i in 0..10 {
385 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
386 }
387 flush_historical_events();
388
389 let (tx, rx) = mpsc::channel();
390 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
391 thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
392
393 fs::write(path.join("new-file"), "").unwrap();
394 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
395 let event = events.last().unwrap();
396 assert_eq!(event.path, path.join("new-file"));
397 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
398
399 fs::remove_file(path.join("existing-file-5")).unwrap();
400 let mut events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
401 let mut event = events.last().unwrap();
402 // we see this duplicate about 1/100 test runs.
403 if event.path == path.join("new-file")
404 && event.flags.contains(StreamFlags::ITEM_CREATED)
405 {
406 events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
407 event = events.last().unwrap();
408 }
409 assert_eq!(event.path, path.join("existing-file-5"));
410 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
411 drop(handle);
412 }
413 }
414
415 #[test]
416 fn test_event_stream_delayed_start() {
417 for _ in 0..3 {
418 let dir = tempfile::Builder::new()
419 .prefix("test-event-stream")
420 .tempdir()
421 .unwrap();
422 let path = dir.path().canonicalize().unwrap();
423 for i in 0..10 {
424 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
425 }
426 flush_historical_events();
427
428 let (tx, rx) = mpsc::channel();
429 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
430
431 // Delay the call to `run` in order to make sure we don't miss any events that occur
432 // between creating the `EventStream` and calling `run`.
433 thread::spawn(move || {
434 thread::sleep(Duration::from_millis(100));
435 stream.run(move |events| tx.send(events.to_vec()).is_ok())
436 });
437
438 fs::write(path.join("new-file"), "").unwrap();
439 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
440 let event = events.last().unwrap();
441 assert_eq!(event.path, path.join("new-file"));
442 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
443
444 fs::remove_file(path.join("existing-file-5")).unwrap();
445 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
446 let event = events.last().unwrap();
447 assert_eq!(event.path, path.join("existing-file-5"));
448 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
449 drop(handle);
450 }
451 }
452
453 #[test]
454 fn test_event_stream_shutdown_by_dropping_handle() {
455 let dir = tempfile::Builder::new()
456 .prefix("test-event-stream")
457 .tempdir()
458 .unwrap();
459 let path = dir.path().canonicalize().unwrap();
460 flush_historical_events();
461
462 let (tx, rx) = mpsc::channel();
463 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
464 thread::spawn(move || {
465 stream.run({
466 let tx = tx.clone();
467 move |_| {
468 tx.send("running").unwrap();
469 true
470 }
471 });
472 tx.send("stopped").unwrap();
473 });
474
475 fs::write(path.join("new-file"), "").unwrap();
476 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
477
478 // Dropping the handle causes `EventStream::run` to return.
479 drop(handle);
480 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
481 }
482
483 #[test]
484 fn test_event_stream_shutdown_before_run() {
485 let dir = tempfile::Builder::new()
486 .prefix("test-event-stream")
487 .tempdir()
488 .unwrap();
489 let path = dir.path().canonicalize().unwrap();
490
491 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
492 drop(handle);
493
494 // This returns immediately because the handle was already dropped.
495 stream.run(|_| true);
496 }
497
498 fn flush_historical_events() {
499 let duration = if std::env::var("CI").is_ok() {
500 Duration::from_secs(2)
501 } else {
502 Duration::from_millis(500)
503 };
504 thread::sleep(duration);
505 }
506}