lib.rs

  1#![cfg(target_os = "macos")]
  2
  3use bitflags::bitflags;
  4use fsevent_sys::{self as fs, core_foundation as cf};
  5use std::{
  6    convert::AsRef,
  7    ffi::{c_void, CStr, OsStr},
  8    os::unix::ffi::OsStrExt,
  9    path::{Path, PathBuf},
 10    slice,
 11    time::Duration,
 12};
 13
 14#[derive(Clone, Debug)]
 15pub struct Event {
 16    pub event_id: u64,
 17    pub flags: StreamFlags,
 18    pub path: PathBuf,
 19}
 20
 21pub struct EventStream<F> {
 22    stream: fs::FSEventStreamRef,
 23    _callback: Box<F>,
 24}
 25
 26unsafe impl<F> Send for EventStream<F> {}
 27
 28impl<F> EventStream<F>
 29where
 30    F: FnMut(&[Event]) -> bool,
 31{
 32    pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self {
 33        unsafe {
 34            let callback = Box::new(callback);
 35            let stream_context = fs::FSEventStreamContext {
 36                version: 0,
 37                info: callback.as_ref() as *const _ as *mut c_void,
 38                retain: None,
 39                release: None,
 40                copy_description: None,
 41            };
 42
 43            let cf_paths =
 44                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 45            assert!(!cf_paths.is_null());
 46
 47            for path in paths {
 48                let path_bytes = path.as_os_str().as_bytes();
 49                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 50                    cf::kCFAllocatorDefault,
 51                    path_bytes.as_ptr() as *const i8,
 52                    path_bytes.len() as cf::CFIndex,
 53                    false,
 54                );
 55                let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 56                cf::CFArrayAppendValue(cf_paths, cf_path);
 57                cf::CFRelease(cf_path);
 58                cf::CFRelease(cf_url);
 59            }
 60
 61            let stream = fs::FSEventStreamCreate(
 62                cf::kCFAllocatorDefault,
 63                Self::trampoline,
 64                &stream_context,
 65                cf_paths,
 66                fs::kFSEventStreamEventIdSinceNow,
 67                latency.as_secs_f64(),
 68                fs::kFSEventStreamCreateFlagFileEvents
 69                    | fs::kFSEventStreamCreateFlagNoDefer
 70                    | fs::kFSEventStreamCreateFlagWatchRoot,
 71            );
 72            cf::CFRelease(cf_paths);
 73
 74            EventStream {
 75                stream,
 76                _callback: callback,
 77            }
 78        }
 79    }
 80
 81    pub fn run(self) {
 82        unsafe {
 83            fs::FSEventStreamScheduleWithRunLoop(
 84                self.stream,
 85                cf::CFRunLoopGetCurrent(),
 86                cf::kCFRunLoopDefaultMode,
 87            );
 88
 89            fs::FSEventStreamStart(self.stream);
 90            cf::CFRunLoopRun();
 91
 92            fs::FSEventStreamFlushSync(self.stream);
 93            fs::FSEventStreamStop(self.stream);
 94            fs::FSEventStreamRelease(self.stream);
 95        }
 96    }
 97
 98    extern "C" fn trampoline(
 99        stream_ref: fs::FSEventStreamRef,
100        info: *mut ::std::os::raw::c_void,
101        num: usize,                                 // size_t numEvents
102        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
103        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
104        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
105    ) {
106        unsafe {
107            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
108            let e_ptr = event_flags as *mut u32;
109            let i_ptr = event_ids as *mut u64;
110            let callback = (info as *mut F).as_mut().unwrap();
111
112            let paths = slice::from_raw_parts(event_paths, num);
113            let flags = slice::from_raw_parts_mut(e_ptr, num);
114            let ids = slice::from_raw_parts_mut(i_ptr, num);
115
116            let mut events = Vec::with_capacity(num);
117            for p in 0..num {
118                let path_c_str = CStr::from_ptr(paths[p]);
119                let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
120                if let Some(flag) = StreamFlags::from_bits(flags[p]) {
121                    events.push(Event {
122                        event_id: ids[p],
123                        flags: flag,
124                        path,
125                    });
126                } else {
127                    debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
128                }
129            }
130
131            if !callback(&events) {
132                fs::FSEventStreamStop(stream_ref);
133                cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
134            }
135        }
136    }
137}
138
139// Synchronize with
140// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
141bitflags! {
142  #[repr(C)]
143  pub struct StreamFlags: u32 {
144    const NONE = 0x00000000;
145    const MUST_SCAN_SUBDIRS = 0x00000001;
146    const USER_DROPPED = 0x00000002;
147    const KERNEL_DROPPED = 0x00000004;
148    const IDS_WRAPPED = 0x00000008;
149    const HISTORY_DONE = 0x00000010;
150    const ROOT_CHANGED = 0x00000020;
151    const MOUNT = 0x00000040;
152    const UNMOUNT = 0x00000080;
153    const ITEM_CREATED = 0x00000100;
154    const ITEM_REMOVED = 0x00000200;
155    const INODE_META_MOD = 0x00000400;
156    const ITEM_RENAMED = 0x00000800;
157    const ITEM_MODIFIED = 0x00001000;
158    const FINDER_INFO_MOD = 0x00002000;
159    const ITEM_CHANGE_OWNER = 0x00004000;
160    const ITEM_XATTR_MOD = 0x00008000;
161    const IS_FILE = 0x00010000;
162    const IS_DIR = 0x00020000;
163    const IS_SYMLINK = 0x00040000;
164    const OWN_EVENT = 0x00080000;
165    const IS_HARDLINK = 0x00100000;
166    const IS_LAST_HARDLINK = 0x00200000;
167    const ITEM_CLONED = 0x400000;
168  }
169}
170
171impl std::fmt::Display for StreamFlags {
172    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
174            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
175        }
176        if self.contains(StreamFlags::USER_DROPPED) {
177            let _d = write!(f, "USER_DROPPED ");
178        }
179        if self.contains(StreamFlags::KERNEL_DROPPED) {
180            let _d = write!(f, "KERNEL_DROPPED ");
181        }
182        if self.contains(StreamFlags::IDS_WRAPPED) {
183            let _d = write!(f, "IDS_WRAPPED ");
184        }
185        if self.contains(StreamFlags::HISTORY_DONE) {
186            let _d = write!(f, "HISTORY_DONE ");
187        }
188        if self.contains(StreamFlags::ROOT_CHANGED) {
189            let _d = write!(f, "ROOT_CHANGED ");
190        }
191        if self.contains(StreamFlags::MOUNT) {
192            let _d = write!(f, "MOUNT ");
193        }
194        if self.contains(StreamFlags::UNMOUNT) {
195            let _d = write!(f, "UNMOUNT ");
196        }
197        if self.contains(StreamFlags::ITEM_CREATED) {
198            let _d = write!(f, "ITEM_CREATED ");
199        }
200        if self.contains(StreamFlags::ITEM_REMOVED) {
201            let _d = write!(f, "ITEM_REMOVED ");
202        }
203        if self.contains(StreamFlags::INODE_META_MOD) {
204            let _d = write!(f, "INODE_META_MOD ");
205        }
206        if self.contains(StreamFlags::ITEM_RENAMED) {
207            let _d = write!(f, "ITEM_RENAMED ");
208        }
209        if self.contains(StreamFlags::ITEM_MODIFIED) {
210            let _d = write!(f, "ITEM_MODIFIED ");
211        }
212        if self.contains(StreamFlags::FINDER_INFO_MOD) {
213            let _d = write!(f, "FINDER_INFO_MOD ");
214        }
215        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
216            let _d = write!(f, "ITEM_CHANGE_OWNER ");
217        }
218        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
219            let _d = write!(f, "ITEM_XATTR_MOD ");
220        }
221        if self.contains(StreamFlags::IS_FILE) {
222            let _d = write!(f, "IS_FILE ");
223        }
224        if self.contains(StreamFlags::IS_DIR) {
225            let _d = write!(f, "IS_DIR ");
226        }
227        if self.contains(StreamFlags::IS_SYMLINK) {
228            let _d = write!(f, "IS_SYMLINK ");
229        }
230        if self.contains(StreamFlags::OWN_EVENT) {
231            let _d = write!(f, "OWN_EVENT ");
232        }
233        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
234            let _d = write!(f, "IS_LAST_HARDLINK ");
235        }
236        if self.contains(StreamFlags::IS_HARDLINK) {
237            let _d = write!(f, "IS_HARDLINK ");
238        }
239        if self.contains(StreamFlags::ITEM_CLONED) {
240            let _d = write!(f, "ITEM_CLONED ");
241        }
242        write!(f, "")
243    }
244}
245
246#[test]
247fn test_event_stream() {
248    use std::{fs, sync::mpsc, time::Duration};
249    use tempdir::TempDir;
250
251    let dir = TempDir::new("test_observe").unwrap();
252    let path = dir.path().canonicalize().unwrap();
253    fs::write(path.join("a"), "a contents").unwrap();
254
255    let (tx, rx) = mpsc::channel();
256    let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| {
257        tx.send(events.to_vec()).is_ok()
258    });
259    std::thread::spawn(move || stream.run());
260
261    fs::write(path.join("b"), "b contents").unwrap();
262    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
263    let event = events.last().unwrap();
264    assert_eq!(event.path, path.join("b"));
265    assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
266
267    fs::remove_file(path.join("a")).unwrap();
268    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
269    let event = events.last().unwrap();
270    assert_eq!(event.path, path.join("a"));
271    assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
272}