fsevent.rs

  1#![cfg(target_os = "macos")]
  2
  3use bitflags::bitflags;
  4use fsevent_sys::{self as fs, core_foundation as cf};
  5use parking_lot::Mutex;
  6use std::{
  7    convert::AsRef,
  8    ffi::{CStr, OsStr, c_void},
  9    os::unix::ffi::OsStrExt,
 10    path::{Path, PathBuf},
 11    ptr, slice,
 12    sync::Arc,
 13    time::Duration,
 14};
 15
 16#[derive(Clone, Debug)]
 17pub struct Event {
 18    pub event_id: u64,
 19    pub flags: StreamFlags,
 20    pub path: PathBuf,
 21}
 22
 23pub struct EventStream {
 24    lifecycle: Arc<Mutex<Lifecycle>>,
 25    state: Box<State>,
 26}
 27
 28struct State {
 29    latency: Duration,
 30    paths: cf::CFMutableArrayRef,
 31    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
 32    last_valid_event_id: Option<fs::FSEventStreamEventId>,
 33    stream: fs::FSEventStreamRef,
 34}
 35
 36impl Drop for State {
 37    fn drop(&mut self) {
 38        unsafe {
 39            cf::CFRelease(self.paths);
 40            fs::FSEventStreamStop(self.stream);
 41            fs::FSEventStreamInvalidate(self.stream);
 42            fs::FSEventStreamRelease(self.stream);
 43        }
 44    }
 45}
 46
 47enum Lifecycle {
 48    New,
 49    Running(cf::CFRunLoopRef),
 50    Stopped,
 51}
 52
 53pub struct Handle(Arc<Mutex<Lifecycle>>);
 54
 55unsafe impl Send for EventStream {}
 56unsafe impl Send for Lifecycle {}
 57
 58impl EventStream {
 59    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
 60        unsafe {
 61            let cf_paths =
 62                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 63            assert!(!cf_paths.is_null());
 64
 65            for path in paths {
 66                let path_bytes = path.as_os_str().as_bytes();
 67                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 68                    cf::kCFAllocatorDefault,
 69                    path_bytes.as_ptr() as *const i8,
 70                    path_bytes.len() as cf::CFIndex,
 71                    false,
 72                );
 73                let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 74                cf::CFArrayAppendValue(cf_paths, cf_path);
 75                cf::CFRelease(cf_path);
 76                cf::CFRelease(cf_url);
 77            }
 78
 79            let mut state = Box::new(State {
 80                latency,
 81                paths: cf_paths,
 82                callback: None,
 83                last_valid_event_id: None,
 84                stream: ptr::null_mut(),
 85            });
 86            let stream_context = fs::FSEventStreamContext {
 87                version: 0,
 88                info: state.as_ref() as *const _ as *mut c_void,
 89                retain: None,
 90                release: None,
 91                copy_description: None,
 92            };
 93            let stream = fs::FSEventStreamCreate(
 94                cf::kCFAllocatorDefault,
 95                Self::trampoline,
 96                &stream_context,
 97                cf_paths,
 98                FSEventsGetCurrentEventId(),
 99                latency.as_secs_f64(),
100                fs::kFSEventStreamCreateFlagFileEvents
101                    | fs::kFSEventStreamCreateFlagNoDefer
102                    | fs::kFSEventStreamCreateFlagWatchRoot,
103            );
104            state.stream = stream;
105
106            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
107            (
108                EventStream {
109                    lifecycle: lifecycle.clone(),
110                    state,
111                },
112                Handle(lifecycle),
113            )
114        }
115    }
116
117    pub fn run<F>(mut self, f: F)
118    where
119        F: FnMut(Vec<Event>) -> bool + 'static,
120    {
121        self.state.callback = Some(Box::new(f));
122        unsafe {
123            let run_loop =
124                core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
125            {
126                let mut state = self.lifecycle.lock();
127                match *state {
128                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
129                    Lifecycle::Running(_) => unreachable!(),
130                    Lifecycle::Stopped => return,
131                }
132            }
133            fs::FSEventStreamScheduleWithRunLoop(
134                self.state.stream,
135                run_loop,
136                cf::kCFRunLoopDefaultMode,
137            );
138            fs::FSEventStreamStart(self.state.stream);
139            cf::CFRunLoopRun();
140        }
141    }
142
143    extern "C" fn trampoline(
144        stream_ref: fs::FSEventStreamRef,
145        info: *mut ::std::os::raw::c_void,
146        num: usize,                                 // size_t numEvents
147        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
148        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
149        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
150    ) {
151        unsafe {
152            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
153            let e_ptr = event_flags as *mut u32;
154            let i_ptr = event_ids as *mut u64;
155            let state = (info as *mut State).as_mut().unwrap();
156            let callback = if let Some(callback) = state.callback.as_mut() {
157                callback
158            } else {
159                return;
160            };
161
162            let paths = slice::from_raw_parts(event_paths, num);
163            let flags = slice::from_raw_parts_mut(e_ptr, num);
164            let ids = slice::from_raw_parts_mut(i_ptr, num);
165            let mut stream_restarted = false;
166
167            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
168            // or our code couldn't keep up with the sheer volume of file-system events that were
169            // generated. If we observed a valid event before this happens, we'll try to read the
170            // file-system journal by stopping the current stream and creating a new one starting at
171            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
172            // will likely perform a re-scan of one of the root directories.
173            if flags
174                .iter()
175                .copied()
176                .filter_map(StreamFlags::from_bits)
177                .any(|flags| {
178                    flags.contains(StreamFlags::USER_DROPPED)
179                        || flags.contains(StreamFlags::KERNEL_DROPPED)
180                })
181                && let Some(last_valid_event_id) = state.last_valid_event_id.take() {
182                    fs::FSEventStreamStop(state.stream);
183                    fs::FSEventStreamInvalidate(state.stream);
184                    fs::FSEventStreamRelease(state.stream);
185
186                    let stream_context = fs::FSEventStreamContext {
187                        version: 0,
188                        info,
189                        retain: None,
190                        release: None,
191                        copy_description: None,
192                    };
193                    let stream = fs::FSEventStreamCreate(
194                        cf::kCFAllocatorDefault,
195                        Self::trampoline,
196                        &stream_context,
197                        state.paths,
198                        last_valid_event_id,
199                        state.latency.as_secs_f64(),
200                        fs::kFSEventStreamCreateFlagFileEvents
201                            | fs::kFSEventStreamCreateFlagNoDefer
202                            | fs::kFSEventStreamCreateFlagWatchRoot,
203                    );
204
205                    state.stream = stream;
206                    fs::FSEventStreamScheduleWithRunLoop(
207                        state.stream,
208                        cf::CFRunLoopGetCurrent(),
209                        cf::kCFRunLoopDefaultMode,
210                    );
211                    fs::FSEventStreamStart(state.stream);
212                    stream_restarted = true;
213                }
214
215            if !stream_restarted {
216                let mut events = Vec::with_capacity(num);
217                for p in 0..num {
218                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
219                        if !flag.contains(StreamFlags::HISTORY_DONE) {
220                            let path_c_str = CStr::from_ptr(paths[p]);
221                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
222                            let event = Event {
223                                event_id: ids[p],
224                                flags: flag,
225                                path,
226                            };
227                            state.last_valid_event_id = Some(event.event_id);
228                            events.push(event);
229                        }
230                    } else {
231                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
232                    }
233                }
234
235                if !events.is_empty() && !callback(events) {
236                    fs::FSEventStreamStop(stream_ref);
237                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
238                }
239            }
240        }
241    }
242}
243
244impl Drop for Handle {
245    fn drop(&mut self) {
246        let mut state = self.0.lock();
247        if let Lifecycle::Running(run_loop) = *state {
248            unsafe {
249                cf::CFRunLoopStop(run_loop);
250                cf::CFRelease(run_loop)
251            }
252        }
253        *state = Lifecycle::Stopped;
254    }
255}
256
257// Synchronize with
258// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
259bitflags! {
260    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
261  #[repr(C)]
262  pub struct StreamFlags: u32 {
263    const NONE = 0x00000000;
264    const MUST_SCAN_SUBDIRS = 0x00000001;
265    const USER_DROPPED = 0x00000002;
266    const KERNEL_DROPPED = 0x00000004;
267    const IDS_WRAPPED = 0x00000008;
268    const HISTORY_DONE = 0x00000010;
269    const ROOT_CHANGED = 0x00000020;
270    const MOUNT = 0x00000040;
271    const UNMOUNT = 0x00000080;
272    const ITEM_CREATED = 0x00000100;
273    const ITEM_REMOVED = 0x00000200;
274    const INODE_META_MOD = 0x00000400;
275    const ITEM_RENAMED = 0x00000800;
276    const ITEM_MODIFIED = 0x00001000;
277    const FINDER_INFO_MOD = 0x00002000;
278    const ITEM_CHANGE_OWNER = 0x00004000;
279    const ITEM_XATTR_MOD = 0x00008000;
280    const IS_FILE = 0x00010000;
281    const IS_DIR = 0x00020000;
282    const IS_SYMLINK = 0x00040000;
283    const OWN_EVENT = 0x00080000;
284    const IS_HARDLINK = 0x00100000;
285    const IS_LAST_HARDLINK = 0x00200000;
286    const ITEM_CLONED = 0x400000;
287  }
288}
289
290impl std::fmt::Display for StreamFlags {
291    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
292        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
293            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
294        }
295        if self.contains(StreamFlags::USER_DROPPED) {
296            let _d = write!(f, "USER_DROPPED ");
297        }
298        if self.contains(StreamFlags::KERNEL_DROPPED) {
299            let _d = write!(f, "KERNEL_DROPPED ");
300        }
301        if self.contains(StreamFlags::IDS_WRAPPED) {
302            let _d = write!(f, "IDS_WRAPPED ");
303        }
304        if self.contains(StreamFlags::HISTORY_DONE) {
305            let _d = write!(f, "HISTORY_DONE ");
306        }
307        if self.contains(StreamFlags::ROOT_CHANGED) {
308            let _d = write!(f, "ROOT_CHANGED ");
309        }
310        if self.contains(StreamFlags::MOUNT) {
311            let _d = write!(f, "MOUNT ");
312        }
313        if self.contains(StreamFlags::UNMOUNT) {
314            let _d = write!(f, "UNMOUNT ");
315        }
316        if self.contains(StreamFlags::ITEM_CREATED) {
317            let _d = write!(f, "ITEM_CREATED ");
318        }
319        if self.contains(StreamFlags::ITEM_REMOVED) {
320            let _d = write!(f, "ITEM_REMOVED ");
321        }
322        if self.contains(StreamFlags::INODE_META_MOD) {
323            let _d = write!(f, "INODE_META_MOD ");
324        }
325        if self.contains(StreamFlags::ITEM_RENAMED) {
326            let _d = write!(f, "ITEM_RENAMED ");
327        }
328        if self.contains(StreamFlags::ITEM_MODIFIED) {
329            let _d = write!(f, "ITEM_MODIFIED ");
330        }
331        if self.contains(StreamFlags::FINDER_INFO_MOD) {
332            let _d = write!(f, "FINDER_INFO_MOD ");
333        }
334        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
335            let _d = write!(f, "ITEM_CHANGE_OWNER ");
336        }
337        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
338            let _d = write!(f, "ITEM_XATTR_MOD ");
339        }
340        if self.contains(StreamFlags::IS_FILE) {
341            let _d = write!(f, "IS_FILE ");
342        }
343        if self.contains(StreamFlags::IS_DIR) {
344            let _d = write!(f, "IS_DIR ");
345        }
346        if self.contains(StreamFlags::IS_SYMLINK) {
347            let _d = write!(f, "IS_SYMLINK ");
348        }
349        if self.contains(StreamFlags::OWN_EVENT) {
350            let _d = write!(f, "OWN_EVENT ");
351        }
352        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
353            let _d = write!(f, "IS_LAST_HARDLINK ");
354        }
355        if self.contains(StreamFlags::IS_HARDLINK) {
356            let _d = write!(f, "IS_HARDLINK ");
357        }
358        if self.contains(StreamFlags::ITEM_CLONED) {
359            let _d = write!(f, "ITEM_CLONED ");
360        }
361        write!(f, "")
362    }
363}
364
365#[link(name = "CoreServices", kind = "framework")]
366unsafe extern "C" {
367    pub fn FSEventsGetCurrentEventId() -> u64;
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373    use std::{fs, sync::mpsc, thread, time::Duration};
374
375    #[test]
376    fn test_event_stream_simple() {
377        for _ in 0..3 {
378            let dir = tempfile::Builder::new()
379                .prefix("test-event-stream")
380                .tempdir()
381                .unwrap();
382            let path = dir.path().canonicalize().unwrap();
383            for i in 0..10 {
384                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
385            }
386            flush_historical_events();
387
388            let (tx, rx) = mpsc::channel();
389            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
390            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
391
392            fs::write(path.join("new-file"), "").unwrap();
393            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
394            let event = events.last().unwrap();
395            assert_eq!(event.path, path.join("new-file"));
396            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
397
398            fs::remove_file(path.join("existing-file-5")).unwrap();
399            let mut events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
400            let mut event = events.last().unwrap();
401            // we see this duplicate about 1/100 test runs.
402            if event.path == path.join("new-file")
403                && event.flags.contains(StreamFlags::ITEM_CREATED)
404            {
405                events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
406                event = events.last().unwrap();
407            }
408            assert_eq!(event.path, path.join("existing-file-5"));
409            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
410            drop(handle);
411        }
412    }
413
414    #[test]
415    fn test_event_stream_delayed_start() {
416        for _ in 0..3 {
417            let dir = tempfile::Builder::new()
418                .prefix("test-event-stream")
419                .tempdir()
420                .unwrap();
421            let path = dir.path().canonicalize().unwrap();
422            for i in 0..10 {
423                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
424            }
425            flush_historical_events();
426
427            let (tx, rx) = mpsc::channel();
428            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
429
430            // Delay the call to `run` in order to make sure we don't miss any events that occur
431            // between creating the `EventStream` and calling `run`.
432            thread::spawn(move || {
433                thread::sleep(Duration::from_millis(100));
434                stream.run(move |events| tx.send(events.to_vec()).is_ok())
435            });
436
437            fs::write(path.join("new-file"), "").unwrap();
438            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
439            let event = events.last().unwrap();
440            assert_eq!(event.path, path.join("new-file"));
441            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
442
443            fs::remove_file(path.join("existing-file-5")).unwrap();
444            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
445            let event = events.last().unwrap();
446            assert_eq!(event.path, path.join("existing-file-5"));
447            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
448            drop(handle);
449        }
450    }
451
452    #[test]
453    fn test_event_stream_shutdown_by_dropping_handle() {
454        let dir = tempfile::Builder::new()
455            .prefix("test-event-stream")
456            .tempdir()
457            .unwrap();
458        let path = dir.path().canonicalize().unwrap();
459        flush_historical_events();
460
461        let (tx, rx) = mpsc::channel();
462        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
463        thread::spawn(move || {
464            stream.run({
465                let tx = tx.clone();
466                move |_| {
467                    tx.send("running").unwrap();
468                    true
469                }
470            });
471            tx.send("stopped").unwrap();
472        });
473
474        fs::write(path.join("new-file"), "").unwrap();
475        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
476
477        // Dropping the handle causes `EventStream::run` to return.
478        drop(handle);
479        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
480    }
481
482    #[test]
483    fn test_event_stream_shutdown_before_run() {
484        let dir = tempfile::Builder::new()
485            .prefix("test-event-stream")
486            .tempdir()
487            .unwrap();
488        let path = dir.path().canonicalize().unwrap();
489
490        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
491        drop(handle);
492
493        // This returns immediately because the handle was already dropped.
494        stream.run(|_| true);
495    }
496
497    fn flush_historical_events() {
498        let duration = if std::env::var("CI").is_ok() {
499            Duration::from_secs(2)
500        } else {
501            Duration::from_millis(500)
502        };
503        thread::sleep(duration);
504    }
505}