fsevent.rs

  1#![cfg(target_os = "macos")]
  2
  3use bitflags::bitflags;
  4use fsevent_sys::{self as fs, core_foundation as cf};
  5use parking_lot::Mutex;
  6use std::{
  7    convert::AsRef,
  8    ffi::{CStr, OsStr, c_void},
  9    os::unix::ffi::OsStrExt,
 10    path::{Path, PathBuf},
 11    ptr, slice,
 12    sync::Arc,
 13    time::Duration,
 14};
 15
 16#[derive(Clone, Debug)]
 17pub struct Event {
 18    pub event_id: u64,
 19    pub flags: StreamFlags,
 20    pub path: PathBuf,
 21}
 22
 23pub struct EventStream {
 24    lifecycle: Arc<Mutex<Lifecycle>>,
 25    state: Box<State>,
 26}
 27
 28struct State {
 29    latency: Duration,
 30    paths: cf::CFMutableArrayRef,
 31    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
 32    last_valid_event_id: Option<fs::FSEventStreamEventId>,
 33    stream: fs::FSEventStreamRef,
 34}
 35
 36impl Drop for State {
 37    fn drop(&mut self) {
 38        unsafe {
 39            cf::CFRelease(self.paths);
 40            fs::FSEventStreamStop(self.stream);
 41            fs::FSEventStreamInvalidate(self.stream);
 42            fs::FSEventStreamRelease(self.stream);
 43        }
 44    }
 45}
 46
 47enum Lifecycle {
 48    New,
 49    Running(cf::CFRunLoopRef),
 50    Stopped,
 51}
 52
 53pub struct Handle(Arc<Mutex<Lifecycle>>);
 54
 55unsafe impl Send for EventStream {}
 56unsafe impl Send for Lifecycle {}
 57
 58impl EventStream {
 59    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
 60        unsafe {
 61            let cf_paths =
 62                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 63            assert!(!cf_paths.is_null());
 64
 65            for path in paths {
 66                let path_bytes = path.as_os_str().as_bytes();
 67                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 68                    cf::kCFAllocatorDefault,
 69                    path_bytes.as_ptr() as *const i8,
 70                    path_bytes.len() as cf::CFIndex,
 71                    false,
 72                );
 73                if !cf_url.is_null() {
 74                    let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 75                    cf::CFArrayAppendValue(cf_paths, cf_path);
 76                    cf::CFRelease(cf_path);
 77                    cf::CFRelease(cf_url);
 78                } else {
 79                    log::error!("Failed to create CFURL for path: {}", path.display());
 80                }
 81            }
 82
 83            let mut state = Box::new(State {
 84                latency,
 85                paths: cf_paths,
 86                callback: None,
 87                last_valid_event_id: None,
 88                stream: ptr::null_mut(),
 89            });
 90            let stream_context = fs::FSEventStreamContext {
 91                version: 0,
 92                info: state.as_ref() as *const _ as *mut c_void,
 93                retain: None,
 94                release: None,
 95                copy_description: None,
 96            };
 97            let stream = fs::FSEventStreamCreate(
 98                cf::kCFAllocatorDefault,
 99                Self::trampoline,
100                &stream_context,
101                cf_paths,
102                FSEventsGetCurrentEventId(),
103                latency.as_secs_f64(),
104                fs::kFSEventStreamCreateFlagFileEvents
105                    | fs::kFSEventStreamCreateFlagNoDefer
106                    | fs::kFSEventStreamCreateFlagWatchRoot,
107            );
108            state.stream = stream;
109
110            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
111            (
112                EventStream {
113                    lifecycle: lifecycle.clone(),
114                    state,
115                },
116                Handle(lifecycle),
117            )
118        }
119    }
120
121    pub fn run<F>(mut self, f: F)
122    where
123        F: FnMut(Vec<Event>) -> bool + 'static,
124    {
125        self.state.callback = Some(Box::new(f));
126        unsafe {
127            let run_loop =
128                core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
129            {
130                let mut state = self.lifecycle.lock();
131                match *state {
132                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
133                    Lifecycle::Running(_) => unreachable!(),
134                    Lifecycle::Stopped => return,
135                }
136            }
137            fs::FSEventStreamScheduleWithRunLoop(
138                self.state.stream,
139                run_loop,
140                cf::kCFRunLoopDefaultMode,
141            );
142            fs::FSEventStreamStart(self.state.stream);
143            cf::CFRunLoopRun();
144        }
145    }
146
147    extern "C" fn trampoline(
148        stream_ref: fs::FSEventStreamRef,
149        info: *mut ::std::os::raw::c_void,
150        num: usize,                                 // size_t numEvents
151        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
152        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
153        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
154    ) {
155        unsafe {
156            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
157            let e_ptr = event_flags as *mut u32;
158            let i_ptr = event_ids as *mut u64;
159            let state = (info as *mut State).as_mut().unwrap();
160            let callback = if let Some(callback) = state.callback.as_mut() {
161                callback
162            } else {
163                return;
164            };
165
166            let paths = slice::from_raw_parts(event_paths, num);
167            let flags = slice::from_raw_parts_mut(e_ptr, num);
168            let ids = slice::from_raw_parts_mut(i_ptr, num);
169            let mut stream_restarted = false;
170
171            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
172            // or our code couldn't keep up with the sheer volume of file-system events that were
173            // generated. If we observed a valid event before this happens, we'll try to read the
174            // file-system journal by stopping the current stream and creating a new one starting at
175            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
176            // will likely perform a re-scan of one of the root directories.
177            if flags
178                .iter()
179                .copied()
180                .filter_map(StreamFlags::from_bits)
181                .any(|flags| {
182                    flags.contains(StreamFlags::USER_DROPPED)
183                        || flags.contains(StreamFlags::KERNEL_DROPPED)
184                })
185                && let Some(last_valid_event_id) = state.last_valid_event_id.take()
186            {
187                fs::FSEventStreamStop(state.stream);
188                fs::FSEventStreamInvalidate(state.stream);
189                fs::FSEventStreamRelease(state.stream);
190
191                let stream_context = fs::FSEventStreamContext {
192                    version: 0,
193                    info,
194                    retain: None,
195                    release: None,
196                    copy_description: None,
197                };
198                let stream = fs::FSEventStreamCreate(
199                    cf::kCFAllocatorDefault,
200                    Self::trampoline,
201                    &stream_context,
202                    state.paths,
203                    last_valid_event_id,
204                    state.latency.as_secs_f64(),
205                    fs::kFSEventStreamCreateFlagFileEvents
206                        | fs::kFSEventStreamCreateFlagNoDefer
207                        | fs::kFSEventStreamCreateFlagWatchRoot,
208                );
209
210                state.stream = stream;
211                fs::FSEventStreamScheduleWithRunLoop(
212                    state.stream,
213                    cf::CFRunLoopGetCurrent(),
214                    cf::kCFRunLoopDefaultMode,
215                );
216                fs::FSEventStreamStart(state.stream);
217                stream_restarted = true;
218            }
219
220            if !stream_restarted {
221                let mut events = Vec::with_capacity(num);
222                for p in 0..num {
223                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
224                        if !flag.contains(StreamFlags::HISTORY_DONE) {
225                            let path_c_str = CStr::from_ptr(paths[p]);
226                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
227                            let event = Event {
228                                event_id: ids[p],
229                                flags: flag,
230                                path,
231                            };
232                            state.last_valid_event_id = Some(event.event_id);
233                            events.push(event);
234                        }
235                    } else {
236                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
237                    }
238                }
239
240                if !events.is_empty() && !callback(events) {
241                    fs::FSEventStreamStop(stream_ref);
242                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
243                }
244            }
245        }
246    }
247}
248
249impl Drop for Handle {
250    fn drop(&mut self) {
251        let mut state = self.0.lock();
252        if let Lifecycle::Running(run_loop) = *state {
253            unsafe {
254                cf::CFRunLoopStop(run_loop);
255                cf::CFRelease(run_loop)
256            }
257        }
258        *state = Lifecycle::Stopped;
259    }
260}
261
262// Synchronize with
263// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
264bitflags! {
265    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
266  #[repr(C)]
267  pub struct StreamFlags: u32 {
268    const NONE = 0x00000000;
269    const MUST_SCAN_SUBDIRS = 0x00000001;
270    const USER_DROPPED = 0x00000002;
271    const KERNEL_DROPPED = 0x00000004;
272    const IDS_WRAPPED = 0x00000008;
273    const HISTORY_DONE = 0x00000010;
274    const ROOT_CHANGED = 0x00000020;
275    const MOUNT = 0x00000040;
276    const UNMOUNT = 0x00000080;
277    const ITEM_CREATED = 0x00000100;
278    const ITEM_REMOVED = 0x00000200;
279    const INODE_META_MOD = 0x00000400;
280    const ITEM_RENAMED = 0x00000800;
281    const ITEM_MODIFIED = 0x00001000;
282    const FINDER_INFO_MOD = 0x00002000;
283    const ITEM_CHANGE_OWNER = 0x00004000;
284    const ITEM_XATTR_MOD = 0x00008000;
285    const IS_FILE = 0x00010000;
286    const IS_DIR = 0x00020000;
287    const IS_SYMLINK = 0x00040000;
288    const OWN_EVENT = 0x00080000;
289    const IS_HARDLINK = 0x00100000;
290    const IS_LAST_HARDLINK = 0x00200000;
291    const ITEM_CLONED = 0x400000;
292  }
293}
294
295impl std::fmt::Display for StreamFlags {
296    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
297        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
298            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
299        }
300        if self.contains(StreamFlags::USER_DROPPED) {
301            let _d = write!(f, "USER_DROPPED ");
302        }
303        if self.contains(StreamFlags::KERNEL_DROPPED) {
304            let _d = write!(f, "KERNEL_DROPPED ");
305        }
306        if self.contains(StreamFlags::IDS_WRAPPED) {
307            let _d = write!(f, "IDS_WRAPPED ");
308        }
309        if self.contains(StreamFlags::HISTORY_DONE) {
310            let _d = write!(f, "HISTORY_DONE ");
311        }
312        if self.contains(StreamFlags::ROOT_CHANGED) {
313            let _d = write!(f, "ROOT_CHANGED ");
314        }
315        if self.contains(StreamFlags::MOUNT) {
316            let _d = write!(f, "MOUNT ");
317        }
318        if self.contains(StreamFlags::UNMOUNT) {
319            let _d = write!(f, "UNMOUNT ");
320        }
321        if self.contains(StreamFlags::ITEM_CREATED) {
322            let _d = write!(f, "ITEM_CREATED ");
323        }
324        if self.contains(StreamFlags::ITEM_REMOVED) {
325            let _d = write!(f, "ITEM_REMOVED ");
326        }
327        if self.contains(StreamFlags::INODE_META_MOD) {
328            let _d = write!(f, "INODE_META_MOD ");
329        }
330        if self.contains(StreamFlags::ITEM_RENAMED) {
331            let _d = write!(f, "ITEM_RENAMED ");
332        }
333        if self.contains(StreamFlags::ITEM_MODIFIED) {
334            let _d = write!(f, "ITEM_MODIFIED ");
335        }
336        if self.contains(StreamFlags::FINDER_INFO_MOD) {
337            let _d = write!(f, "FINDER_INFO_MOD ");
338        }
339        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
340            let _d = write!(f, "ITEM_CHANGE_OWNER ");
341        }
342        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
343            let _d = write!(f, "ITEM_XATTR_MOD ");
344        }
345        if self.contains(StreamFlags::IS_FILE) {
346            let _d = write!(f, "IS_FILE ");
347        }
348        if self.contains(StreamFlags::IS_DIR) {
349            let _d = write!(f, "IS_DIR ");
350        }
351        if self.contains(StreamFlags::IS_SYMLINK) {
352            let _d = write!(f, "IS_SYMLINK ");
353        }
354        if self.contains(StreamFlags::OWN_EVENT) {
355            let _d = write!(f, "OWN_EVENT ");
356        }
357        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
358            let _d = write!(f, "IS_LAST_HARDLINK ");
359        }
360        if self.contains(StreamFlags::IS_HARDLINK) {
361            let _d = write!(f, "IS_HARDLINK ");
362        }
363        if self.contains(StreamFlags::ITEM_CLONED) {
364            let _d = write!(f, "ITEM_CLONED ");
365        }
366        write!(f, "")
367    }
368}
369
370#[link(name = "CoreServices", kind = "framework")]
371unsafe extern "C" {
372    pub fn FSEventsGetCurrentEventId() -> u64;
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use std::{fs, sync::mpsc, thread, time::Duration};
379
380    #[test]
381    fn test_event_stream_simple() {
382        for _ in 0..3 {
383            let dir = tempfile::Builder::new()
384                .prefix("test-event-stream")
385                .tempdir()
386                .unwrap();
387            let path = dir.path().canonicalize().unwrap();
388            for i in 0..10 {
389                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
390            }
391            flush_historical_events();
392
393            let (tx, rx) = mpsc::channel();
394            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
395            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
396
397            fs::write(path.join("new-file"), "").unwrap();
398            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
399            let event = events.last().unwrap();
400            assert_eq!(event.path, path.join("new-file"));
401            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
402
403            fs::remove_file(path.join("existing-file-5")).unwrap();
404            let mut events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
405            let mut event = events.last().unwrap();
406            // we see this duplicate about 1/100 test runs.
407            if event.path == path.join("new-file")
408                && event.flags.contains(StreamFlags::ITEM_CREATED)
409            {
410                events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
411                event = events.last().unwrap();
412            }
413            assert_eq!(event.path, path.join("existing-file-5"));
414            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
415            drop(handle);
416        }
417    }
418
419    #[test]
420    fn test_event_stream_delayed_start() {
421        for _ in 0..3 {
422            let dir = tempfile::Builder::new()
423                .prefix("test-event-stream")
424                .tempdir()
425                .unwrap();
426            let path = dir.path().canonicalize().unwrap();
427            for i in 0..10 {
428                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
429            }
430            flush_historical_events();
431
432            let (tx, rx) = mpsc::channel();
433            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
434
435            // Delay the call to `run` in order to make sure we don't miss any events that occur
436            // between creating the `EventStream` and calling `run`.
437            thread::spawn(move || {
438                thread::sleep(Duration::from_millis(100));
439                stream.run(move |events| tx.send(events.to_vec()).is_ok())
440            });
441
442            fs::write(path.join("new-file"), "").unwrap();
443            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
444            let event = events.last().unwrap();
445            assert_eq!(event.path, path.join("new-file"));
446            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
447
448            fs::remove_file(path.join("existing-file-5")).unwrap();
449            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
450            let event = events.last().unwrap();
451            assert_eq!(event.path, path.join("existing-file-5"));
452            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
453            drop(handle);
454        }
455    }
456
457    #[test]
458    fn test_event_stream_shutdown_by_dropping_handle() {
459        let dir = tempfile::Builder::new()
460            .prefix("test-event-stream")
461            .tempdir()
462            .unwrap();
463        let path = dir.path().canonicalize().unwrap();
464        flush_historical_events();
465
466        let (tx, rx) = mpsc::channel();
467        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
468        thread::spawn(move || {
469            stream.run({
470                let tx = tx.clone();
471                move |_| {
472                    tx.send("running").unwrap();
473                    true
474                }
475            });
476            tx.send("stopped").unwrap();
477        });
478
479        fs::write(path.join("new-file"), "").unwrap();
480        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
481
482        // Dropping the handle causes `EventStream::run` to return.
483        drop(handle);
484        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
485    }
486
487    #[test]
488    fn test_event_stream_shutdown_before_run() {
489        let dir = tempfile::Builder::new()
490            .prefix("test-event-stream")
491            .tempdir()
492            .unwrap();
493        let path = dir.path().canonicalize().unwrap();
494
495        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
496        drop(handle);
497
498        // This returns immediately because the handle was already dropped.
499        stream.run(|_| true);
500    }
501
502    fn flush_historical_events() {
503        let duration = if std::env::var("CI").is_ok() {
504            Duration::from_secs(2)
505        } else {
506            Duration::from_millis(500)
507        };
508        thread::sleep(duration);
509    }
510}