lib.rs

  1#![cfg(target_os = "macos")]
  2
  3use bitflags::bitflags;
  4use fsevent_sys::{self as fs, core_foundation as cf};
  5use parking_lot::Mutex;
  6use std::{
  7    convert::AsRef,
  8    ffi::{c_void, CStr, OsStr},
  9    os::unix::ffi::OsStrExt,
 10    path::{Path, PathBuf},
 11    slice,
 12    sync::Arc,
 13    time::Duration,
 14};
 15
 16#[derive(Clone, Debug)]
 17pub struct Event {
 18    pub event_id: u64,
 19    pub flags: StreamFlags,
 20    pub path: PathBuf,
 21}
 22
 23pub struct EventStream {
 24    stream: fs::FSEventStreamRef,
 25    state: Arc<Mutex<Lifecycle>>,
 26    callback: Box<Option<RunCallback>>,
 27}
 28
 29type RunCallback = Box<dyn FnMut(Vec<Event>) -> bool>;
 30
 31enum Lifecycle {
 32    New,
 33    Running(cf::CFRunLoopRef),
 34    Stopped,
 35}
 36
 37pub struct Handle(Arc<Mutex<Lifecycle>>);
 38
 39unsafe impl Send for EventStream {}
 40unsafe impl Send for Lifecycle {}
 41
 42impl EventStream {
 43    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
 44        unsafe {
 45            let callback = Box::new(None);
 46            let stream_context = fs::FSEventStreamContext {
 47                version: 0,
 48                info: callback.as_ref() as *const _ as *mut c_void,
 49                retain: None,
 50                release: None,
 51                copy_description: None,
 52            };
 53
 54            let cf_paths =
 55                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 56            assert!(!cf_paths.is_null());
 57
 58            for path in paths {
 59                let path_bytes = path.as_os_str().as_bytes();
 60                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 61                    cf::kCFAllocatorDefault,
 62                    path_bytes.as_ptr() as *const i8,
 63                    path_bytes.len() as cf::CFIndex,
 64                    false,
 65                );
 66                let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 67                cf::CFArrayAppendValue(cf_paths, cf_path);
 68                cf::CFRelease(cf_path);
 69                cf::CFRelease(cf_url);
 70            }
 71
 72            let stream = fs::FSEventStreamCreate(
 73                cf::kCFAllocatorDefault,
 74                Self::trampoline,
 75                &stream_context,
 76                cf_paths,
 77                FSEventsGetCurrentEventId(),
 78                latency.as_secs_f64(),
 79                fs::kFSEventStreamCreateFlagFileEvents
 80                    | fs::kFSEventStreamCreateFlagNoDefer
 81                    | fs::kFSEventStreamCreateFlagWatchRoot,
 82            );
 83            cf::CFRelease(cf_paths);
 84
 85            let state = Arc::new(Mutex::new(Lifecycle::New));
 86
 87            (
 88                EventStream {
 89                    stream,
 90                    state: state.clone(),
 91                    callback,
 92                },
 93                Handle(state),
 94            )
 95        }
 96    }
 97
 98    pub fn run<F>(mut self, f: F)
 99    where
100        F: FnMut(Vec<Event>) -> bool + 'static,
101    {
102        *self.callback = Some(Box::new(f));
103        unsafe {
104            let run_loop = cf::CFRunLoopGetCurrent();
105            {
106                let mut state = self.state.lock();
107                match *state {
108                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
109                    Lifecycle::Running(_) => unreachable!(),
110                    Lifecycle::Stopped => return,
111                }
112            }
113            fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
114            fs::FSEventStreamStart(self.stream);
115            cf::CFRunLoopRun();
116            fs::FSEventStreamRelease(self.stream);
117        }
118    }
119
120    extern "C" fn trampoline(
121        stream_ref: fs::FSEventStreamRef,
122        info: *mut ::std::os::raw::c_void,
123        num: usize,                                 // size_t numEvents
124        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
125        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
126        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
127    ) {
128        unsafe {
129            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
130            let e_ptr = event_flags as *mut u32;
131            let i_ptr = event_ids as *mut u64;
132            let callback_ptr = (info as *mut Option<RunCallback>).as_mut().unwrap();
133            let callback = if let Some(callback) = callback_ptr.as_mut() {
134                callback
135            } else {
136                return;
137            };
138
139            let paths = slice::from_raw_parts(event_paths, num);
140            let flags = slice::from_raw_parts_mut(e_ptr, num);
141            let ids = slice::from_raw_parts_mut(i_ptr, num);
142
143            let mut events = Vec::with_capacity(num);
144            for p in 0..num {
145                let path_c_str = CStr::from_ptr(paths[p]);
146                let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
147                if let Some(flag) = StreamFlags::from_bits(flags[p]) {
148                    if flag.contains(StreamFlags::HISTORY_DONE) {
149                        events.clear();
150                    } else {
151                        events.push(Event {
152                            event_id: ids[p],
153                            flags: flag,
154                            path,
155                        });
156                    }
157                } else {
158                    debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
159                }
160            }
161
162            if !events.is_empty() {
163                if !callback(events) {
164                    fs::FSEventStreamStop(stream_ref);
165                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
166                }
167            }
168        }
169    }
170}
171
172impl Drop for Handle {
173    fn drop(&mut self) {
174        let mut state = self.0.lock();
175        if let Lifecycle::Running(run_loop) = *state {
176            unsafe {
177                cf::CFRunLoopStop(run_loop);
178            }
179        }
180        *state = Lifecycle::Stopped;
181    }
182}
183
184// Synchronize with
185// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
186bitflags! {
187  #[repr(C)]
188  pub struct StreamFlags: u32 {
189    const NONE = 0x00000000;
190    const MUST_SCAN_SUBDIRS = 0x00000001;
191    const USER_DROPPED = 0x00000002;
192    const KERNEL_DROPPED = 0x00000004;
193    const IDS_WRAPPED = 0x00000008;
194    const HISTORY_DONE = 0x00000010;
195    const ROOT_CHANGED = 0x00000020;
196    const MOUNT = 0x00000040;
197    const UNMOUNT = 0x00000080;
198    const ITEM_CREATED = 0x00000100;
199    const ITEM_REMOVED = 0x00000200;
200    const INODE_META_MOD = 0x00000400;
201    const ITEM_RENAMED = 0x00000800;
202    const ITEM_MODIFIED = 0x00001000;
203    const FINDER_INFO_MOD = 0x00002000;
204    const ITEM_CHANGE_OWNER = 0x00004000;
205    const ITEM_XATTR_MOD = 0x00008000;
206    const IS_FILE = 0x00010000;
207    const IS_DIR = 0x00020000;
208    const IS_SYMLINK = 0x00040000;
209    const OWN_EVENT = 0x00080000;
210    const IS_HARDLINK = 0x00100000;
211    const IS_LAST_HARDLINK = 0x00200000;
212    const ITEM_CLONED = 0x400000;
213  }
214}
215
216impl std::fmt::Display for StreamFlags {
217    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
218        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
219            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
220        }
221        if self.contains(StreamFlags::USER_DROPPED) {
222            let _d = write!(f, "USER_DROPPED ");
223        }
224        if self.contains(StreamFlags::KERNEL_DROPPED) {
225            let _d = write!(f, "KERNEL_DROPPED ");
226        }
227        if self.contains(StreamFlags::IDS_WRAPPED) {
228            let _d = write!(f, "IDS_WRAPPED ");
229        }
230        if self.contains(StreamFlags::HISTORY_DONE) {
231            let _d = write!(f, "HISTORY_DONE ");
232        }
233        if self.contains(StreamFlags::ROOT_CHANGED) {
234            let _d = write!(f, "ROOT_CHANGED ");
235        }
236        if self.contains(StreamFlags::MOUNT) {
237            let _d = write!(f, "MOUNT ");
238        }
239        if self.contains(StreamFlags::UNMOUNT) {
240            let _d = write!(f, "UNMOUNT ");
241        }
242        if self.contains(StreamFlags::ITEM_CREATED) {
243            let _d = write!(f, "ITEM_CREATED ");
244        }
245        if self.contains(StreamFlags::ITEM_REMOVED) {
246            let _d = write!(f, "ITEM_REMOVED ");
247        }
248        if self.contains(StreamFlags::INODE_META_MOD) {
249            let _d = write!(f, "INODE_META_MOD ");
250        }
251        if self.contains(StreamFlags::ITEM_RENAMED) {
252            let _d = write!(f, "ITEM_RENAMED ");
253        }
254        if self.contains(StreamFlags::ITEM_MODIFIED) {
255            let _d = write!(f, "ITEM_MODIFIED ");
256        }
257        if self.contains(StreamFlags::FINDER_INFO_MOD) {
258            let _d = write!(f, "FINDER_INFO_MOD ");
259        }
260        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
261            let _d = write!(f, "ITEM_CHANGE_OWNER ");
262        }
263        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
264            let _d = write!(f, "ITEM_XATTR_MOD ");
265        }
266        if self.contains(StreamFlags::IS_FILE) {
267            let _d = write!(f, "IS_FILE ");
268        }
269        if self.contains(StreamFlags::IS_DIR) {
270            let _d = write!(f, "IS_DIR ");
271        }
272        if self.contains(StreamFlags::IS_SYMLINK) {
273            let _d = write!(f, "IS_SYMLINK ");
274        }
275        if self.contains(StreamFlags::OWN_EVENT) {
276            let _d = write!(f, "OWN_EVENT ");
277        }
278        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
279            let _d = write!(f, "IS_LAST_HARDLINK ");
280        }
281        if self.contains(StreamFlags::IS_HARDLINK) {
282            let _d = write!(f, "IS_HARDLINK ");
283        }
284        if self.contains(StreamFlags::ITEM_CLONED) {
285            let _d = write!(f, "ITEM_CLONED ");
286        }
287        write!(f, "")
288    }
289}
290
291#[link(name = "CoreServices", kind = "framework")]
292extern "C" {
293    pub fn FSEventsGetCurrentEventId() -> u64;
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use std::{fs, sync::mpsc, thread, time::Duration};
300    use tempdir::TempDir;
301
302    #[test]
303    fn test_event_stream_simple() {
304        for _ in 0..3 {
305            let dir = TempDir::new("test-event-stream").unwrap();
306            let path = dir.path().canonicalize().unwrap();
307            for i in 0..10 {
308                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
309            }
310            flush_historical_events();
311
312            let (tx, rx) = mpsc::channel();
313            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
314            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
315
316            fs::write(path.join("new-file"), "").unwrap();
317            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
318            let event = events.last().unwrap();
319            assert_eq!(event.path, path.join("new-file"));
320            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
321
322            fs::remove_file(path.join("existing-file-5")).unwrap();
323            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
324            let event = events.last().unwrap();
325            assert_eq!(event.path, path.join("existing-file-5"));
326            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
327            drop(handle);
328        }
329    }
330
331    #[test]
332    fn test_event_stream_delayed_start() {
333        for _ in 0..3 {
334            let dir = TempDir::new("test-event-stream").unwrap();
335            let path = dir.path().canonicalize().unwrap();
336            for i in 0..10 {
337                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
338            }
339            flush_historical_events();
340
341            let (tx, rx) = mpsc::channel();
342            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
343
344            // Delay the call to `run` in order to make sure we don't miss any events that occur
345            // between creating the `EventStream` and calling `run`.
346            thread::spawn(move || {
347                thread::sleep(Duration::from_millis(100));
348                stream.run(move |events| tx.send(events.to_vec()).is_ok())
349            });
350
351            fs::write(path.join("new-file"), "").unwrap();
352            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
353            let event = events.last().unwrap();
354            assert_eq!(event.path, path.join("new-file"));
355            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
356
357            fs::remove_file(path.join("existing-file-5")).unwrap();
358            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
359            let event = events.last().unwrap();
360            assert_eq!(event.path, path.join("existing-file-5"));
361            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
362            drop(handle);
363        }
364    }
365
366    #[test]
367    fn test_event_stream_shutdown_by_dropping_handle() {
368        let dir = TempDir::new("test-event-stream").unwrap();
369        let path = dir.path().canonicalize().unwrap();
370        flush_historical_events();
371
372        let (tx, rx) = mpsc::channel();
373        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
374        thread::spawn(move || {
375            stream.run({
376                let tx = tx.clone();
377                move |_| {
378                    tx.send("running").unwrap();
379                    true
380                }
381            });
382            tx.send("stopped").unwrap();
383        });
384
385        fs::write(path.join("new-file"), "").unwrap();
386        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
387
388        // Dropping the handle causes `EventStream::run` to return.
389        drop(handle);
390        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
391    }
392
393    #[test]
394    fn test_event_stream_shutdown_before_run() {
395        let dir = TempDir::new("test-event-stream").unwrap();
396        let path = dir.path().canonicalize().unwrap();
397
398        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
399        drop(handle);
400
401        // This returns immediately because the handle was already dropped.
402        stream.run(|_| true);
403    }
404
405    fn flush_historical_events() {
406        let duration = if std::env::var("CI").is_ok() {
407            Duration::from_secs(2)
408        } else {
409            Duration::from_millis(500)
410        };
411        thread::sleep(duration);
412    }
413}