fsevent.rs

  1#![cfg(target_os = "macos")]
  2
  3use bitflags::bitflags;
  4use fsevent_sys::{self as fs, core_foundation as cf};
  5use parking_lot::Mutex;
  6use std::{
  7    convert::AsRef,
  8    ffi::{c_void, CStr, OsStr},
  9    os::unix::ffi::OsStrExt,
 10    path::{Path, PathBuf},
 11    slice,
 12    sync::Arc,
 13    time::Duration,
 14};
 15
 16#[derive(Clone, Debug)]
 17pub struct Event {
 18    pub event_id: u64,
 19    pub flags: StreamFlags,
 20    pub path: PathBuf,
 21}
 22
 23pub struct EventStream {
 24    stream: fs::FSEventStreamRef,
 25    lifecycle: Arc<Mutex<Lifecycle>>,
 26    state: Box<State>,
 27}
 28
 29struct State {
 30    latency: Duration,
 31    paths: cf::CFMutableArrayRef,
 32    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
 33    last_valid_event_id: Option<fs::FSEventStreamEventId>,
 34}
 35
 36impl Drop for State {
 37    fn drop(&mut self) {
 38        unsafe {
 39            cf::CFRelease(self.paths);
 40        }
 41    }
 42}
 43
 44enum Lifecycle {
 45    New,
 46    Running(cf::CFRunLoopRef),
 47    Stopped,
 48}
 49
 50pub struct Handle(Arc<Mutex<Lifecycle>>);
 51
 52unsafe impl Send for EventStream {}
 53unsafe impl Send for Lifecycle {}
 54
 55impl EventStream {
 56    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
 57        unsafe {
 58            let cf_paths =
 59                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 60            assert!(!cf_paths.is_null());
 61
 62            for path in paths {
 63                let path_bytes = path.as_os_str().as_bytes();
 64                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 65                    cf::kCFAllocatorDefault,
 66                    path_bytes.as_ptr() as *const i8,
 67                    path_bytes.len() as cf::CFIndex,
 68                    false,
 69                );
 70                let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 71                cf::CFArrayAppendValue(cf_paths, cf_path);
 72                cf::CFRelease(cf_path);
 73                cf::CFRelease(cf_url);
 74            }
 75
 76            let state = Box::new(State {
 77                latency,
 78                paths: cf_paths,
 79                callback: None,
 80                last_valid_event_id: None,
 81            });
 82            let stream_context = fs::FSEventStreamContext {
 83                version: 0,
 84                info: state.as_ref() as *const _ as *mut c_void,
 85                retain: None,
 86                release: None,
 87                copy_description: None,
 88            };
 89            let stream = fs::FSEventStreamCreate(
 90                cf::kCFAllocatorDefault,
 91                Self::trampoline,
 92                &stream_context,
 93                cf_paths,
 94                FSEventsGetCurrentEventId(),
 95                latency.as_secs_f64(),
 96                fs::kFSEventStreamCreateFlagFileEvents
 97                    | fs::kFSEventStreamCreateFlagNoDefer
 98                    | fs::kFSEventStreamCreateFlagWatchRoot,
 99            );
100
101            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
102            (
103                EventStream {
104                    stream,
105                    lifecycle: lifecycle.clone(),
106                    state,
107                },
108                Handle(lifecycle),
109            )
110        }
111    }
112
113    pub fn run<F>(mut self, f: F)
114    where
115        F: FnMut(Vec<Event>) -> bool + 'static,
116    {
117        self.state.callback = Some(Box::new(f));
118        unsafe {
119            let run_loop = cf::CFRunLoopGetCurrent();
120            {
121                let mut state = self.lifecycle.lock();
122                match *state {
123                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
124                    Lifecycle::Running(_) => unreachable!(),
125                    Lifecycle::Stopped => return,
126                }
127            }
128            fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
129            fs::FSEventStreamStart(self.stream);
130            cf::CFRunLoopRun();
131            fs::FSEventStreamRelease(self.stream);
132        }
133    }
134
135    extern "C" fn trampoline(
136        stream_ref: fs::FSEventStreamRef,
137        info: *mut ::std::os::raw::c_void,
138        num: usize,                                 // size_t numEvents
139        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
140        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
141        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
142    ) {
143        unsafe {
144            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
145            let e_ptr = event_flags as *mut u32;
146            let i_ptr = event_ids as *mut u64;
147            let state = (info as *mut State).as_mut().unwrap();
148            let callback = if let Some(callback) = state.callback.as_mut() {
149                callback
150            } else {
151                return;
152            };
153
154            let paths = slice::from_raw_parts(event_paths, num);
155            let flags = slice::from_raw_parts_mut(e_ptr, num);
156            let ids = slice::from_raw_parts_mut(i_ptr, num);
157
158            let mut events = Vec::with_capacity(num);
159            for p in 0..num {
160                let path_c_str = CStr::from_ptr(paths[p]);
161                let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
162                if let Some(flag) = StreamFlags::from_bits(flags[p]) {
163                    if flag.contains(StreamFlags::HISTORY_DONE) {
164                        events.clear();
165                    } else {
166                        events.push(Event {
167                            event_id: ids[p],
168                            flags: flag,
169                            path,
170                        });
171                    }
172                } else {
173                    debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
174                }
175            }
176
177            if !events.is_empty() {
178                if !callback(events) {
179                    fs::FSEventStreamStop(stream_ref);
180                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
181                }
182            }
183        }
184    }
185}
186
187impl Drop for Handle {
188    fn drop(&mut self) {
189        let mut state = self.0.lock();
190        if let Lifecycle::Running(run_loop) = *state {
191            unsafe {
192                cf::CFRunLoopStop(run_loop);
193            }
194        }
195        *state = Lifecycle::Stopped;
196    }
197}
198
199// Synchronize with
200// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
201bitflags! {
202  #[repr(C)]
203  pub struct StreamFlags: u32 {
204    const NONE = 0x00000000;
205    const MUST_SCAN_SUBDIRS = 0x00000001;
206    const USER_DROPPED = 0x00000002;
207    const KERNEL_DROPPED = 0x00000004;
208    const IDS_WRAPPED = 0x00000008;
209    const HISTORY_DONE = 0x00000010;
210    const ROOT_CHANGED = 0x00000020;
211    const MOUNT = 0x00000040;
212    const UNMOUNT = 0x00000080;
213    const ITEM_CREATED = 0x00000100;
214    const ITEM_REMOVED = 0x00000200;
215    const INODE_META_MOD = 0x00000400;
216    const ITEM_RENAMED = 0x00000800;
217    const ITEM_MODIFIED = 0x00001000;
218    const FINDER_INFO_MOD = 0x00002000;
219    const ITEM_CHANGE_OWNER = 0x00004000;
220    const ITEM_XATTR_MOD = 0x00008000;
221    const IS_FILE = 0x00010000;
222    const IS_DIR = 0x00020000;
223    const IS_SYMLINK = 0x00040000;
224    const OWN_EVENT = 0x00080000;
225    const IS_HARDLINK = 0x00100000;
226    const IS_LAST_HARDLINK = 0x00200000;
227    const ITEM_CLONED = 0x400000;
228  }
229}
230
231impl std::fmt::Display for StreamFlags {
232    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
233        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
234            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
235        }
236        if self.contains(StreamFlags::USER_DROPPED) {
237            let _d = write!(f, "USER_DROPPED ");
238        }
239        if self.contains(StreamFlags::KERNEL_DROPPED) {
240            let _d = write!(f, "KERNEL_DROPPED ");
241        }
242        if self.contains(StreamFlags::IDS_WRAPPED) {
243            let _d = write!(f, "IDS_WRAPPED ");
244        }
245        if self.contains(StreamFlags::HISTORY_DONE) {
246            let _d = write!(f, "HISTORY_DONE ");
247        }
248        if self.contains(StreamFlags::ROOT_CHANGED) {
249            let _d = write!(f, "ROOT_CHANGED ");
250        }
251        if self.contains(StreamFlags::MOUNT) {
252            let _d = write!(f, "MOUNT ");
253        }
254        if self.contains(StreamFlags::UNMOUNT) {
255            let _d = write!(f, "UNMOUNT ");
256        }
257        if self.contains(StreamFlags::ITEM_CREATED) {
258            let _d = write!(f, "ITEM_CREATED ");
259        }
260        if self.contains(StreamFlags::ITEM_REMOVED) {
261            let _d = write!(f, "ITEM_REMOVED ");
262        }
263        if self.contains(StreamFlags::INODE_META_MOD) {
264            let _d = write!(f, "INODE_META_MOD ");
265        }
266        if self.contains(StreamFlags::ITEM_RENAMED) {
267            let _d = write!(f, "ITEM_RENAMED ");
268        }
269        if self.contains(StreamFlags::ITEM_MODIFIED) {
270            let _d = write!(f, "ITEM_MODIFIED ");
271        }
272        if self.contains(StreamFlags::FINDER_INFO_MOD) {
273            let _d = write!(f, "FINDER_INFO_MOD ");
274        }
275        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
276            let _d = write!(f, "ITEM_CHANGE_OWNER ");
277        }
278        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
279            let _d = write!(f, "ITEM_XATTR_MOD ");
280        }
281        if self.contains(StreamFlags::IS_FILE) {
282            let _d = write!(f, "IS_FILE ");
283        }
284        if self.contains(StreamFlags::IS_DIR) {
285            let _d = write!(f, "IS_DIR ");
286        }
287        if self.contains(StreamFlags::IS_SYMLINK) {
288            let _d = write!(f, "IS_SYMLINK ");
289        }
290        if self.contains(StreamFlags::OWN_EVENT) {
291            let _d = write!(f, "OWN_EVENT ");
292        }
293        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
294            let _d = write!(f, "IS_LAST_HARDLINK ");
295        }
296        if self.contains(StreamFlags::IS_HARDLINK) {
297            let _d = write!(f, "IS_HARDLINK ");
298        }
299        if self.contains(StreamFlags::ITEM_CLONED) {
300            let _d = write!(f, "ITEM_CLONED ");
301        }
302        write!(f, "")
303    }
304}
305
306#[link(name = "CoreServices", kind = "framework")]
307extern "C" {
308    pub fn FSEventsGetCurrentEventId() -> u64;
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use std::{fs, sync::mpsc, thread, time::Duration};
315    use tempdir::TempDir;
316
317    #[test]
318    fn test_event_stream_simple() {
319        for _ in 0..3 {
320            let dir = TempDir::new("test-event-stream").unwrap();
321            let path = dir.path().canonicalize().unwrap();
322            for i in 0..10 {
323                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
324            }
325            flush_historical_events();
326
327            let (tx, rx) = mpsc::channel();
328            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
329            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
330
331            fs::write(path.join("new-file"), "").unwrap();
332            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
333            let event = events.last().unwrap();
334            assert_eq!(event.path, path.join("new-file"));
335            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
336
337            fs::remove_file(path.join("existing-file-5")).unwrap();
338            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
339            let event = events.last().unwrap();
340            assert_eq!(event.path, path.join("existing-file-5"));
341            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
342            drop(handle);
343        }
344    }
345
346    #[test]
347    fn test_event_stream_delayed_start() {
348        for _ in 0..3 {
349            let dir = TempDir::new("test-event-stream").unwrap();
350            let path = dir.path().canonicalize().unwrap();
351            for i in 0..10 {
352                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
353            }
354            flush_historical_events();
355
356            let (tx, rx) = mpsc::channel();
357            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
358
359            // Delay the call to `run` in order to make sure we don't miss any events that occur
360            // between creating the `EventStream` and calling `run`.
361            thread::spawn(move || {
362                thread::sleep(Duration::from_millis(100));
363                stream.run(move |events| tx.send(events.to_vec()).is_ok())
364            });
365
366            fs::write(path.join("new-file"), "").unwrap();
367            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
368            let event = events.last().unwrap();
369            assert_eq!(event.path, path.join("new-file"));
370            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
371
372            fs::remove_file(path.join("existing-file-5")).unwrap();
373            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
374            let event = events.last().unwrap();
375            assert_eq!(event.path, path.join("existing-file-5"));
376            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
377            drop(handle);
378        }
379    }
380
381    #[test]
382    fn test_event_stream_shutdown_by_dropping_handle() {
383        let dir = TempDir::new("test-event-stream").unwrap();
384        let path = dir.path().canonicalize().unwrap();
385        flush_historical_events();
386
387        let (tx, rx) = mpsc::channel();
388        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
389        thread::spawn(move || {
390            stream.run({
391                let tx = tx.clone();
392                move |_| {
393                    tx.send("running").unwrap();
394                    true
395                }
396            });
397            tx.send("stopped").unwrap();
398        });
399
400        fs::write(path.join("new-file"), "").unwrap();
401        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
402
403        // Dropping the handle causes `EventStream::run` to return.
404        drop(handle);
405        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
406    }
407
408    #[test]
409    fn test_event_stream_shutdown_before_run() {
410        let dir = TempDir::new("test-event-stream").unwrap();
411        let path = dir.path().canonicalize().unwrap();
412
413        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
414        drop(handle);
415
416        // This returns immediately because the handle was already dropped.
417        stream.run(|_| true);
418    }
419
420    fn flush_historical_events() {
421        let duration = if std::env::var("CI").is_ok() {
422            Duration::from_secs(2)
423        } else {
424            Duration::from_millis(500)
425        };
426        thread::sleep(duration);
427    }
428}