lib.rs

  1#![cfg(target_os = "macos")]
  2
  3use bitflags::bitflags;
  4use fsevent_sys::{self as fs, core_foundation as cf};
  5use std::{
  6    convert::AsRef,
  7    ffi::{c_void, CStr, OsStr},
  8    os::unix::ffi::OsStrExt,
  9    path::{Path, PathBuf},
 10    slice,
 11    sync::mpsc::Sender,
 12    time::Duration,
 13};
 14
 15#[derive(Debug)]
 16pub struct Event {
 17    pub event_id: u64,
 18    pub flags: StreamFlags,
 19    pub path: PathBuf,
 20}
 21
 22pub struct EventStream {
 23    stream: fs::FSEventStreamRef,
 24    _sender: Box<Sender<Vec<Event>>>,
 25}
 26
 27unsafe impl Send for EventStream {}
 28
 29impl EventStream {
 30    pub fn new(paths: &[&Path], latency: Duration, event_sender: Sender<Vec<Event>>) -> Self {
 31        unsafe {
 32            let sender = Box::new(event_sender);
 33            let stream_context = fs::FSEventStreamContext {
 34                version: 0,
 35                info: sender.as_ref() as *const _ as *mut c_void,
 36                retain: None,
 37                release: None,
 38                copy_description: None,
 39            };
 40
 41            let cf_paths =
 42                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 43            assert!(!cf_paths.is_null());
 44
 45            for path in paths {
 46                let path_bytes = path.as_os_str().as_bytes();
 47                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 48                    cf::kCFAllocatorDefault,
 49                    path_bytes.as_ptr() as *const i8,
 50                    path_bytes.len() as cf::CFIndex,
 51                    false,
 52                );
 53                let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 54                cf::CFArrayAppendValue(cf_paths, cf_path);
 55                cf::CFRelease(cf_path);
 56                cf::CFRelease(cf_url);
 57            }
 58
 59            let stream = fs::FSEventStreamCreate(
 60                cf::kCFAllocatorDefault,
 61                callback,
 62                &stream_context,
 63                cf_paths,
 64                fs::kFSEventStreamEventIdSinceNow,
 65                latency.as_secs_f64(),
 66                fs::kFSEventStreamCreateFlagFileEvents
 67                    | fs::kFSEventStreamCreateFlagNoDefer
 68                    | fs::kFSEventStreamCreateFlagWatchRoot,
 69            );
 70            cf::CFRelease(cf_paths);
 71
 72            EventStream {
 73                stream,
 74                _sender: sender,
 75            }
 76        }
 77    }
 78
 79    pub fn run(self) {
 80        unsafe {
 81            fs::FSEventStreamScheduleWithRunLoop(
 82                self.stream,
 83                cf::CFRunLoopGetCurrent(),
 84                cf::kCFRunLoopDefaultMode,
 85            );
 86
 87            fs::FSEventStreamStart(self.stream);
 88            cf::CFRunLoopRun();
 89
 90            fs::FSEventStreamFlushSync(self.stream);
 91            fs::FSEventStreamStop(self.stream);
 92            fs::FSEventStreamRelease(self.stream);
 93        }
 94    }
 95}
 96
 97extern "C" fn callback(
 98    stream_ref: fs::FSEventStreamRef,
 99    info: *mut ::std::os::raw::c_void,
100    num: usize,                                 // size_t numEvents
101    event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
102    event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
103    event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
104) {
105    unsafe {
106        let event_paths = event_paths as *const *const ::std::os::raw::c_char;
107        let e_ptr = event_flags as *mut u32;
108        let i_ptr = event_ids as *mut u64;
109        let sender = (info as *mut Sender<Vec<Event>>).as_mut().unwrap();
110
111        let paths = slice::from_raw_parts(event_paths, num);
112        let flags = slice::from_raw_parts_mut(e_ptr, num);
113        let ids = slice::from_raw_parts_mut(i_ptr, num);
114
115        let mut events = Vec::with_capacity(num);
116        for p in 0..num {
117            let path_c_str = CStr::from_ptr(paths[p]);
118            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
119            if let Some(flag) = StreamFlags::from_bits(flags[p]) {
120                events.push(Event {
121                    event_id: ids[p],
122                    flags: flag,
123                    path,
124                });
125            } else {
126                debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
127            }
128        }
129
130        if sender.send(events).is_err() {
131            fs::FSEventStreamStop(stream_ref);
132            cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
133        }
134    }
135}
136
137// Synchronize with
138// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
139bitflags! {
140  #[repr(C)]
141  pub struct StreamFlags: u32 {
142    const NONE = 0x00000000;
143    const MUST_SCAN_SUBDIRS = 0x00000001;
144    const USER_DROPPED = 0x00000002;
145    const KERNEL_DROPPED = 0x00000004;
146    const IDS_WRAPPED = 0x00000008;
147    const HISTORY_DONE = 0x00000010;
148    const ROOT_CHANGED = 0x00000020;
149    const MOUNT = 0x00000040;
150    const UNMOUNT = 0x00000080;
151    const ITEM_CREATED = 0x00000100;
152    const ITEM_REMOVED = 0x00000200;
153    const INODE_META_MOD = 0x00000400;
154    const ITEM_RENAMED = 0x00000800;
155    const ITEM_MODIFIED = 0x00001000;
156    const FINDER_INFO_MOD = 0x00002000;
157    const ITEM_CHANGE_OWNER = 0x00004000;
158    const ITEM_XATTR_MOD = 0x00008000;
159    const IS_FILE = 0x00010000;
160    const IS_DIR = 0x00020000;
161    const IS_SYMLINK = 0x00040000;
162    const OWN_EVENT = 0x00080000;
163    const IS_HARDLINK = 0x00100000;
164    const IS_LAST_HARDLINK = 0x00200000;
165    const ITEM_CLONED = 0x400000;
166  }
167}
168
169impl std::fmt::Display for StreamFlags {
170    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
171        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
172            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
173        }
174        if self.contains(StreamFlags::USER_DROPPED) {
175            let _d = write!(f, "USER_DROPPED ");
176        }
177        if self.contains(StreamFlags::KERNEL_DROPPED) {
178            let _d = write!(f, "KERNEL_DROPPED ");
179        }
180        if self.contains(StreamFlags::IDS_WRAPPED) {
181            let _d = write!(f, "IDS_WRAPPED ");
182        }
183        if self.contains(StreamFlags::HISTORY_DONE) {
184            let _d = write!(f, "HISTORY_DONE ");
185        }
186        if self.contains(StreamFlags::ROOT_CHANGED) {
187            let _d = write!(f, "ROOT_CHANGED ");
188        }
189        if self.contains(StreamFlags::MOUNT) {
190            let _d = write!(f, "MOUNT ");
191        }
192        if self.contains(StreamFlags::UNMOUNT) {
193            let _d = write!(f, "UNMOUNT ");
194        }
195        if self.contains(StreamFlags::ITEM_CREATED) {
196            let _d = write!(f, "ITEM_CREATED ");
197        }
198        if self.contains(StreamFlags::ITEM_REMOVED) {
199            let _d = write!(f, "ITEM_REMOVED ");
200        }
201        if self.contains(StreamFlags::INODE_META_MOD) {
202            let _d = write!(f, "INODE_META_MOD ");
203        }
204        if self.contains(StreamFlags::ITEM_RENAMED) {
205            let _d = write!(f, "ITEM_RENAMED ");
206        }
207        if self.contains(StreamFlags::ITEM_MODIFIED) {
208            let _d = write!(f, "ITEM_MODIFIED ");
209        }
210        if self.contains(StreamFlags::FINDER_INFO_MOD) {
211            let _d = write!(f, "FINDER_INFO_MOD ");
212        }
213        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
214            let _d = write!(f, "ITEM_CHANGE_OWNER ");
215        }
216        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
217            let _d = write!(f, "ITEM_XATTR_MOD ");
218        }
219        if self.contains(StreamFlags::IS_FILE) {
220            let _d = write!(f, "IS_FILE ");
221        }
222        if self.contains(StreamFlags::IS_DIR) {
223            let _d = write!(f, "IS_DIR ");
224        }
225        if self.contains(StreamFlags::IS_SYMLINK) {
226            let _d = write!(f, "IS_SYMLINK ");
227        }
228        if self.contains(StreamFlags::OWN_EVENT) {
229            let _d = write!(f, "OWN_EVENT ");
230        }
231        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
232            let _d = write!(f, "IS_LAST_HARDLINK ");
233        }
234        if self.contains(StreamFlags::IS_HARDLINK) {
235            let _d = write!(f, "IS_HARDLINK ");
236        }
237        if self.contains(StreamFlags::ITEM_CLONED) {
238            let _d = write!(f, "ITEM_CLONED ");
239        }
240        write!(f, "")
241    }
242}
243
244#[test]
245fn test_observe() {
246    use std::{fs, sync::mpsc, time::Duration};
247    use tempdir::TempDir;
248
249    let dir = TempDir::new("test_observe").unwrap();
250    let path = dir.path().canonicalize().unwrap();
251    fs::write(path.join("a"), "a contents").unwrap();
252
253    let (tx, rx) = mpsc::channel();
254    let stream = EventStream::new(&[&path], Duration::from_millis(50), tx);
255    std::thread::spawn(move || stream.run());
256
257    fs::write(path.join("b"), "b contents").unwrap();
258    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
259    let event = events.last().unwrap();
260    assert_eq!(event.path, path.join("b"));
261    assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
262
263    fs::remove_file(path.join("a")).unwrap();
264    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
265    let event = events.last().unwrap();
266    assert_eq!(event.path, path.join("a"));
267    assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
268
269    let dir2 = TempDir::new("test_observe2").unwrap();
270    fs::rename(path, dir2.path().join("something")).unwrap();
271    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
272}