1#![cfg(target_os = "macos")]
2
3use bitflags::bitflags;
4use fsevent_sys::{self as fs, core_foundation as cf};
5use parking_lot::Mutex;
6use std::{
7 convert::AsRef,
8 ffi::{CStr, OsStr, c_void},
9 os::unix::ffi::OsStrExt,
10 path::{Path, PathBuf},
11 ptr, slice,
12 sync::Arc,
13 time::Duration,
14};
15
16#[derive(Clone, Debug)]
17pub struct Event {
18 pub event_id: u64,
19 pub flags: StreamFlags,
20 pub path: PathBuf,
21}
22
23pub struct EventStream {
24 lifecycle: Arc<Mutex<Lifecycle>>,
25 state: Box<State>,
26}
27
28struct State {
29 latency: Duration,
30 paths: cf::CFMutableArrayRef,
31 callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
32 last_valid_event_id: Option<fs::FSEventStreamEventId>,
33 stream: fs::FSEventStreamRef,
34}
35
36impl Drop for State {
37 fn drop(&mut self) {
38 unsafe {
39 cf::CFRelease(self.paths);
40 fs::FSEventStreamStop(self.stream);
41 fs::FSEventStreamInvalidate(self.stream);
42 fs::FSEventStreamRelease(self.stream);
43 }
44 }
45}
46
47enum Lifecycle {
48 New,
49 Running(cf::CFRunLoopRef),
50 Stopped,
51}
52
53pub struct Handle(Arc<Mutex<Lifecycle>>);
54
55unsafe impl Send for EventStream {}
56unsafe impl Send for Lifecycle {}
57
58impl EventStream {
59 pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
60 unsafe {
61 let cf_paths =
62 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
63 assert!(!cf_paths.is_null());
64
65 for path in paths {
66 let path_bytes = path.as_os_str().as_bytes();
67 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
68 cf::kCFAllocatorDefault,
69 path_bytes.as_ptr() as *const i8,
70 path_bytes.len() as cf::CFIndex,
71 false,
72 );
73 if !cf_url.is_null() {
74 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
75 cf::CFArrayAppendValue(cf_paths, cf_path);
76 cf::CFRelease(cf_path);
77 cf::CFRelease(cf_url);
78 } else {
79 log::error!("Failed to create CFURL for path: {}", path.display());
80 }
81 }
82
83 let mut state = Box::new(State {
84 latency,
85 paths: cf_paths,
86 callback: None,
87 last_valid_event_id: None,
88 stream: ptr::null_mut(),
89 });
90 let stream_context = fs::FSEventStreamContext {
91 version: 0,
92 info: state.as_ref() as *const _ as *mut c_void,
93 retain: None,
94 release: None,
95 copy_description: None,
96 };
97 let stream = fs::FSEventStreamCreate(
98 cf::kCFAllocatorDefault,
99 Self::trampoline,
100 &stream_context,
101 cf_paths,
102 FSEventsGetCurrentEventId(),
103 latency.as_secs_f64(),
104 fs::kFSEventStreamCreateFlagFileEvents
105 | fs::kFSEventStreamCreateFlagNoDefer
106 | fs::kFSEventStreamCreateFlagWatchRoot,
107 );
108 state.stream = stream;
109
110 let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
111 (
112 EventStream {
113 lifecycle: lifecycle.clone(),
114 state,
115 },
116 Handle(lifecycle),
117 )
118 }
119 }
120
121 pub fn run<F>(mut self, f: F)
122 where
123 F: FnMut(Vec<Event>) -> bool + 'static,
124 {
125 self.state.callback = Some(Box::new(f));
126 unsafe {
127 let run_loop =
128 core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
129 {
130 let mut state = self.lifecycle.lock();
131 match *state {
132 Lifecycle::New => *state = Lifecycle::Running(run_loop),
133 Lifecycle::Running(_) => unreachable!(),
134 Lifecycle::Stopped => return,
135 }
136 }
137 fs::FSEventStreamScheduleWithRunLoop(
138 self.state.stream,
139 run_loop,
140 cf::kCFRunLoopDefaultMode,
141 );
142 fs::FSEventStreamStart(self.state.stream);
143 cf::CFRunLoopRun();
144 }
145 }
146
147 extern "C" fn trampoline(
148 stream_ref: fs::FSEventStreamRef,
149 info: *mut ::std::os::raw::c_void,
150 num: usize, // size_t numEvents
151 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
152 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
153 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
154 ) {
155 unsafe {
156 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
157 let e_ptr = event_flags as *mut u32;
158 let i_ptr = event_ids as *mut u64;
159 let state = (info as *mut State).as_mut().unwrap();
160 let callback = if let Some(callback) = state.callback.as_mut() {
161 callback
162 } else {
163 return;
164 };
165
166 let paths = slice::from_raw_parts(event_paths, num);
167 let flags = slice::from_raw_parts_mut(e_ptr, num);
168 let ids = slice::from_raw_parts_mut(i_ptr, num);
169 let mut stream_restarted = false;
170
171 // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
172 // or our code couldn't keep up with the sheer volume of file-system events that were
173 // generated. If we observed a valid event before this happens, we'll try to read the
174 // file-system journal by stopping the current stream and creating a new one starting at
175 // such event. Otherwise, we'll let invoke the callback with the dropped event, which
176 // will likely perform a re-scan of one of the root directories.
177 if flags
178 .iter()
179 .copied()
180 .filter_map(StreamFlags::from_bits)
181 .any(|flags| {
182 flags.contains(StreamFlags::USER_DROPPED)
183 || flags.contains(StreamFlags::KERNEL_DROPPED)
184 })
185 && let Some(last_valid_event_id) = state.last_valid_event_id.take()
186 {
187 fs::FSEventStreamStop(state.stream);
188 fs::FSEventStreamInvalidate(state.stream);
189 fs::FSEventStreamRelease(state.stream);
190
191 let stream_context = fs::FSEventStreamContext {
192 version: 0,
193 info,
194 retain: None,
195 release: None,
196 copy_description: None,
197 };
198 let stream = fs::FSEventStreamCreate(
199 cf::kCFAllocatorDefault,
200 Self::trampoline,
201 &stream_context,
202 state.paths,
203 last_valid_event_id,
204 state.latency.as_secs_f64(),
205 fs::kFSEventStreamCreateFlagFileEvents
206 | fs::kFSEventStreamCreateFlagNoDefer
207 | fs::kFSEventStreamCreateFlagWatchRoot,
208 );
209
210 state.stream = stream;
211 fs::FSEventStreamScheduleWithRunLoop(
212 state.stream,
213 cf::CFRunLoopGetCurrent(),
214 cf::kCFRunLoopDefaultMode,
215 );
216 fs::FSEventStreamStart(state.stream);
217 stream_restarted = true;
218 }
219
220 if !stream_restarted {
221 let mut events = Vec::with_capacity(num);
222 for p in 0..num {
223 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
224 if !flag.contains(StreamFlags::HISTORY_DONE) {
225 let path_c_str = CStr::from_ptr(paths[p]);
226 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
227 let event = Event {
228 event_id: ids[p],
229 flags: flag,
230 path,
231 };
232 state.last_valid_event_id = Some(event.event_id);
233 events.push(event);
234 }
235 } else {
236 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
237 }
238 }
239
240 if !events.is_empty() && !callback(events) {
241 fs::FSEventStreamStop(stream_ref);
242 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
243 }
244 }
245 }
246 }
247}
248
249impl Drop for Handle {
250 fn drop(&mut self) {
251 let mut state = self.0.lock();
252 if let Lifecycle::Running(run_loop) = *state {
253 unsafe {
254 cf::CFRunLoopStop(run_loop);
255 cf::CFRelease(run_loop)
256 }
257 }
258 *state = Lifecycle::Stopped;
259 }
260}
261
262// Synchronize with
263// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
264bitflags! {
265 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
266 #[repr(C)]
267 pub struct StreamFlags: u32 {
268 const NONE = 0x00000000;
269 const MUST_SCAN_SUBDIRS = 0x00000001;
270 const USER_DROPPED = 0x00000002;
271 const KERNEL_DROPPED = 0x00000004;
272 const IDS_WRAPPED = 0x00000008;
273 const HISTORY_DONE = 0x00000010;
274 const ROOT_CHANGED = 0x00000020;
275 const MOUNT = 0x00000040;
276 const UNMOUNT = 0x00000080;
277 const ITEM_CREATED = 0x00000100;
278 const ITEM_REMOVED = 0x00000200;
279 const INODE_META_MOD = 0x00000400;
280 const ITEM_RENAMED = 0x00000800;
281 const ITEM_MODIFIED = 0x00001000;
282 const FINDER_INFO_MOD = 0x00002000;
283 const ITEM_CHANGE_OWNER = 0x00004000;
284 const ITEM_XATTR_MOD = 0x00008000;
285 const IS_FILE = 0x00010000;
286 const IS_DIR = 0x00020000;
287 const IS_SYMLINK = 0x00040000;
288 const OWN_EVENT = 0x00080000;
289 const IS_HARDLINK = 0x00100000;
290 const IS_LAST_HARDLINK = 0x00200000;
291 const ITEM_CLONED = 0x400000;
292 }
293}
294
295impl std::fmt::Display for StreamFlags {
296 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
297 if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
298 let _d = write!(f, "MUST_SCAN_SUBDIRS ");
299 }
300 if self.contains(StreamFlags::USER_DROPPED) {
301 let _d = write!(f, "USER_DROPPED ");
302 }
303 if self.contains(StreamFlags::KERNEL_DROPPED) {
304 let _d = write!(f, "KERNEL_DROPPED ");
305 }
306 if self.contains(StreamFlags::IDS_WRAPPED) {
307 let _d = write!(f, "IDS_WRAPPED ");
308 }
309 if self.contains(StreamFlags::HISTORY_DONE) {
310 let _d = write!(f, "HISTORY_DONE ");
311 }
312 if self.contains(StreamFlags::ROOT_CHANGED) {
313 let _d = write!(f, "ROOT_CHANGED ");
314 }
315 if self.contains(StreamFlags::MOUNT) {
316 let _d = write!(f, "MOUNT ");
317 }
318 if self.contains(StreamFlags::UNMOUNT) {
319 let _d = write!(f, "UNMOUNT ");
320 }
321 if self.contains(StreamFlags::ITEM_CREATED) {
322 let _d = write!(f, "ITEM_CREATED ");
323 }
324 if self.contains(StreamFlags::ITEM_REMOVED) {
325 let _d = write!(f, "ITEM_REMOVED ");
326 }
327 if self.contains(StreamFlags::INODE_META_MOD) {
328 let _d = write!(f, "INODE_META_MOD ");
329 }
330 if self.contains(StreamFlags::ITEM_RENAMED) {
331 let _d = write!(f, "ITEM_RENAMED ");
332 }
333 if self.contains(StreamFlags::ITEM_MODIFIED) {
334 let _d = write!(f, "ITEM_MODIFIED ");
335 }
336 if self.contains(StreamFlags::FINDER_INFO_MOD) {
337 let _d = write!(f, "FINDER_INFO_MOD ");
338 }
339 if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
340 let _d = write!(f, "ITEM_CHANGE_OWNER ");
341 }
342 if self.contains(StreamFlags::ITEM_XATTR_MOD) {
343 let _d = write!(f, "ITEM_XATTR_MOD ");
344 }
345 if self.contains(StreamFlags::IS_FILE) {
346 let _d = write!(f, "IS_FILE ");
347 }
348 if self.contains(StreamFlags::IS_DIR) {
349 let _d = write!(f, "IS_DIR ");
350 }
351 if self.contains(StreamFlags::IS_SYMLINK) {
352 let _d = write!(f, "IS_SYMLINK ");
353 }
354 if self.contains(StreamFlags::OWN_EVENT) {
355 let _d = write!(f, "OWN_EVENT ");
356 }
357 if self.contains(StreamFlags::IS_LAST_HARDLINK) {
358 let _d = write!(f, "IS_LAST_HARDLINK ");
359 }
360 if self.contains(StreamFlags::IS_HARDLINK) {
361 let _d = write!(f, "IS_HARDLINK ");
362 }
363 if self.contains(StreamFlags::ITEM_CLONED) {
364 let _d = write!(f, "ITEM_CLONED ");
365 }
366 write!(f, "")
367 }
368}
369
370#[link(name = "CoreServices", kind = "framework")]
371unsafe extern "C" {
372 pub fn FSEventsGetCurrentEventId() -> u64;
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use std::{fs, sync::mpsc, thread, time::Duration};
379
380 #[test]
381 fn test_event_stream_simple() {
382 for _ in 0..3 {
383 let dir = tempfile::Builder::new()
384 .prefix("test-event-stream")
385 .tempdir()
386 .unwrap();
387 let path = dir.path().canonicalize().unwrap();
388 for i in 0..10 {
389 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
390 }
391 flush_historical_events();
392
393 let (tx, rx) = mpsc::channel();
394 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
395 thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
396
397 fs::write(path.join("new-file"), "").unwrap();
398 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
399 let event = events.last().unwrap();
400 assert_eq!(event.path, path.join("new-file"));
401 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
402
403 fs::remove_file(path.join("existing-file-5")).unwrap();
404 let mut events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
405 let mut event = events.last().unwrap();
406 // we see this duplicate about 1/100 test runs.
407 if event.path == path.join("new-file")
408 && event.flags.contains(StreamFlags::ITEM_CREATED)
409 {
410 events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
411 event = events.last().unwrap();
412 }
413 assert_eq!(event.path, path.join("existing-file-5"));
414 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
415 drop(handle);
416 }
417 }
418
419 #[test]
420 fn test_event_stream_delayed_start() {
421 for _ in 0..3 {
422 let dir = tempfile::Builder::new()
423 .prefix("test-event-stream")
424 .tempdir()
425 .unwrap();
426 let path = dir.path().canonicalize().unwrap();
427 for i in 0..10 {
428 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
429 }
430 flush_historical_events();
431
432 let (tx, rx) = mpsc::channel();
433 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
434
435 // Delay the call to `run` in order to make sure we don't miss any events that occur
436 // between creating the `EventStream` and calling `run`.
437 thread::spawn(move || {
438 thread::sleep(Duration::from_millis(100));
439 stream.run(move |events| tx.send(events.to_vec()).is_ok())
440 });
441
442 fs::write(path.join("new-file"), "").unwrap();
443 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
444 let event = events.last().unwrap();
445 assert_eq!(event.path, path.join("new-file"));
446 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
447
448 fs::remove_file(path.join("existing-file-5")).unwrap();
449 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
450 let event = events.last().unwrap();
451 assert_eq!(event.path, path.join("existing-file-5"));
452 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
453 drop(handle);
454 }
455 }
456
457 #[test]
458 fn test_event_stream_shutdown_by_dropping_handle() {
459 let dir = tempfile::Builder::new()
460 .prefix("test-event-stream")
461 .tempdir()
462 .unwrap();
463 let path = dir.path().canonicalize().unwrap();
464 flush_historical_events();
465
466 let (tx, rx) = mpsc::channel();
467 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
468 thread::spawn(move || {
469 stream.run({
470 let tx = tx.clone();
471 move |_| {
472 tx.send("running").unwrap();
473 true
474 }
475 });
476 tx.send("stopped").unwrap();
477 });
478
479 fs::write(path.join("new-file"), "").unwrap();
480 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
481
482 // Dropping the handle causes `EventStream::run` to return.
483 drop(handle);
484 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
485 }
486
487 #[test]
488 fn test_event_stream_shutdown_before_run() {
489 let dir = tempfile::Builder::new()
490 .prefix("test-event-stream")
491 .tempdir()
492 .unwrap();
493 let path = dir.path().canonicalize().unwrap();
494
495 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
496 drop(handle);
497
498 // This returns immediately because the handle was already dropped.
499 stream.run(|_| true);
500 }
501
502 fn flush_historical_events() {
503 let duration = if std::env::var("CI").is_ok() {
504 Duration::from_secs(2)
505 } else {
506 Duration::from_millis(500)
507 };
508 thread::sleep(duration);
509 }
510}