1#![cfg(target_os = "macos")]
2
3use bitflags::bitflags;
4use fsevent_sys::{self as fs, core_foundation as cf};
5use parking_lot::Mutex;
6use std::{
7 convert::AsRef,
8 ffi::{c_void, CStr, OsStr},
9 os::unix::ffi::OsStrExt,
10 path::{Path, PathBuf},
11 slice,
12 sync::Arc,
13 time::Duration,
14};
15
16#[derive(Clone, Debug)]
17pub struct Event {
18 pub event_id: u64,
19 pub flags: StreamFlags,
20 pub path: PathBuf,
21}
22
23pub struct EventStream {
24 stream: fs::FSEventStreamRef,
25 state: Arc<Mutex<Lifecycle>>,
26 callback: Box<Option<RunCallback>>,
27}
28
29type RunCallback = Box<dyn FnMut(Vec<Event>) -> bool>;
30
31enum Lifecycle {
32 New,
33 Running(cf::CFRunLoopRef),
34 Stopped,
35}
36
37pub struct Handle(Arc<Mutex<Lifecycle>>);
38
39unsafe impl Send for EventStream {}
40unsafe impl Send for Lifecycle {}
41
42impl EventStream {
43 pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
44 unsafe {
45 let callback = Box::new(None);
46 let stream_context = fs::FSEventStreamContext {
47 version: 0,
48 info: callback.as_ref() as *const _ as *mut c_void,
49 retain: None,
50 release: None,
51 copy_description: None,
52 };
53
54 let cf_paths =
55 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
56 assert!(!cf_paths.is_null());
57
58 for path in paths {
59 let path_bytes = path.as_os_str().as_bytes();
60 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
61 cf::kCFAllocatorDefault,
62 path_bytes.as_ptr() as *const i8,
63 path_bytes.len() as cf::CFIndex,
64 false,
65 );
66 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
67 cf::CFArrayAppendValue(cf_paths, cf_path);
68 cf::CFRelease(cf_path);
69 cf::CFRelease(cf_url);
70 }
71
72 let stream = fs::FSEventStreamCreate(
73 cf::kCFAllocatorDefault,
74 Self::trampoline,
75 &stream_context,
76 cf_paths,
77 FSEventsGetCurrentEventId(),
78 latency.as_secs_f64(),
79 fs::kFSEventStreamCreateFlagFileEvents
80 | fs::kFSEventStreamCreateFlagNoDefer
81 | fs::kFSEventStreamCreateFlagWatchRoot,
82 );
83 cf::CFRelease(cf_paths);
84
85 let state = Arc::new(Mutex::new(Lifecycle::New));
86
87 (
88 EventStream {
89 stream,
90 state: state.clone(),
91 callback,
92 },
93 Handle(state),
94 )
95 }
96 }
97
98 pub fn run<F>(mut self, f: F)
99 where
100 F: FnMut(Vec<Event>) -> bool + 'static,
101 {
102 *self.callback = Some(Box::new(f));
103 unsafe {
104 let run_loop = cf::CFRunLoopGetCurrent();
105 {
106 let mut state = self.state.lock();
107 match *state {
108 Lifecycle::New => *state = Lifecycle::Running(run_loop),
109 Lifecycle::Running(_) => unreachable!(),
110 Lifecycle::Stopped => return,
111 }
112 }
113 fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
114 fs::FSEventStreamStart(self.stream);
115 cf::CFRunLoopRun();
116 fs::FSEventStreamRelease(self.stream);
117 }
118 }
119
120 extern "C" fn trampoline(
121 stream_ref: fs::FSEventStreamRef,
122 info: *mut ::std::os::raw::c_void,
123 num: usize, // size_t numEvents
124 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
125 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
126 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
127 ) {
128 unsafe {
129 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
130 let e_ptr = event_flags as *mut u32;
131 let i_ptr = event_ids as *mut u64;
132 let callback_ptr = (info as *mut Option<RunCallback>).as_mut().unwrap();
133 let callback = if let Some(callback) = callback_ptr.as_mut() {
134 callback
135 } else {
136 return;
137 };
138
139 let paths = slice::from_raw_parts(event_paths, num);
140 let flags = slice::from_raw_parts_mut(e_ptr, num);
141 let ids = slice::from_raw_parts_mut(i_ptr, num);
142
143 let mut events = Vec::with_capacity(num);
144 for p in 0..num {
145 let path_c_str = CStr::from_ptr(paths[p]);
146 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
147 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
148 if flag.contains(StreamFlags::HISTORY_DONE) {
149 events.clear();
150 } else {
151 events.push(Event {
152 event_id: ids[p],
153 flags: flag,
154 path,
155 });
156 }
157 } else {
158 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
159 }
160 }
161
162 if !events.is_empty() {
163 if !callback(events) {
164 fs::FSEventStreamStop(stream_ref);
165 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
166 }
167 }
168 }
169 }
170}
171
172impl Drop for Handle {
173 fn drop(&mut self) {
174 let mut state = self.0.lock();
175 if let Lifecycle::Running(run_loop) = *state {
176 unsafe {
177 cf::CFRunLoopStop(run_loop);
178 }
179 }
180 *state = Lifecycle::Stopped;
181 }
182}
183
184// Synchronize with
185// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
186bitflags! {
187 #[repr(C)]
188 pub struct StreamFlags: u32 {
189 const NONE = 0x00000000;
190 const MUST_SCAN_SUBDIRS = 0x00000001;
191 const USER_DROPPED = 0x00000002;
192 const KERNEL_DROPPED = 0x00000004;
193 const IDS_WRAPPED = 0x00000008;
194 const HISTORY_DONE = 0x00000010;
195 const ROOT_CHANGED = 0x00000020;
196 const MOUNT = 0x00000040;
197 const UNMOUNT = 0x00000080;
198 const ITEM_CREATED = 0x00000100;
199 const ITEM_REMOVED = 0x00000200;
200 const INODE_META_MOD = 0x00000400;
201 const ITEM_RENAMED = 0x00000800;
202 const ITEM_MODIFIED = 0x00001000;
203 const FINDER_INFO_MOD = 0x00002000;
204 const ITEM_CHANGE_OWNER = 0x00004000;
205 const ITEM_XATTR_MOD = 0x00008000;
206 const IS_FILE = 0x00010000;
207 const IS_DIR = 0x00020000;
208 const IS_SYMLINK = 0x00040000;
209 const OWN_EVENT = 0x00080000;
210 const IS_HARDLINK = 0x00100000;
211 const IS_LAST_HARDLINK = 0x00200000;
212 const ITEM_CLONED = 0x400000;
213 }
214}
215
216impl std::fmt::Display for StreamFlags {
217 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
218 if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
219 let _d = write!(f, "MUST_SCAN_SUBDIRS ");
220 }
221 if self.contains(StreamFlags::USER_DROPPED) {
222 let _d = write!(f, "USER_DROPPED ");
223 }
224 if self.contains(StreamFlags::KERNEL_DROPPED) {
225 let _d = write!(f, "KERNEL_DROPPED ");
226 }
227 if self.contains(StreamFlags::IDS_WRAPPED) {
228 let _d = write!(f, "IDS_WRAPPED ");
229 }
230 if self.contains(StreamFlags::HISTORY_DONE) {
231 let _d = write!(f, "HISTORY_DONE ");
232 }
233 if self.contains(StreamFlags::ROOT_CHANGED) {
234 let _d = write!(f, "ROOT_CHANGED ");
235 }
236 if self.contains(StreamFlags::MOUNT) {
237 let _d = write!(f, "MOUNT ");
238 }
239 if self.contains(StreamFlags::UNMOUNT) {
240 let _d = write!(f, "UNMOUNT ");
241 }
242 if self.contains(StreamFlags::ITEM_CREATED) {
243 let _d = write!(f, "ITEM_CREATED ");
244 }
245 if self.contains(StreamFlags::ITEM_REMOVED) {
246 let _d = write!(f, "ITEM_REMOVED ");
247 }
248 if self.contains(StreamFlags::INODE_META_MOD) {
249 let _d = write!(f, "INODE_META_MOD ");
250 }
251 if self.contains(StreamFlags::ITEM_RENAMED) {
252 let _d = write!(f, "ITEM_RENAMED ");
253 }
254 if self.contains(StreamFlags::ITEM_MODIFIED) {
255 let _d = write!(f, "ITEM_MODIFIED ");
256 }
257 if self.contains(StreamFlags::FINDER_INFO_MOD) {
258 let _d = write!(f, "FINDER_INFO_MOD ");
259 }
260 if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
261 let _d = write!(f, "ITEM_CHANGE_OWNER ");
262 }
263 if self.contains(StreamFlags::ITEM_XATTR_MOD) {
264 let _d = write!(f, "ITEM_XATTR_MOD ");
265 }
266 if self.contains(StreamFlags::IS_FILE) {
267 let _d = write!(f, "IS_FILE ");
268 }
269 if self.contains(StreamFlags::IS_DIR) {
270 let _d = write!(f, "IS_DIR ");
271 }
272 if self.contains(StreamFlags::IS_SYMLINK) {
273 let _d = write!(f, "IS_SYMLINK ");
274 }
275 if self.contains(StreamFlags::OWN_EVENT) {
276 let _d = write!(f, "OWN_EVENT ");
277 }
278 if self.contains(StreamFlags::IS_LAST_HARDLINK) {
279 let _d = write!(f, "IS_LAST_HARDLINK ");
280 }
281 if self.contains(StreamFlags::IS_HARDLINK) {
282 let _d = write!(f, "IS_HARDLINK ");
283 }
284 if self.contains(StreamFlags::ITEM_CLONED) {
285 let _d = write!(f, "ITEM_CLONED ");
286 }
287 write!(f, "")
288 }
289}
290
291#[link(name = "CoreServices", kind = "framework")]
292extern "C" {
293 pub fn FSEventsGetCurrentEventId() -> u64;
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use std::{fs, sync::mpsc, thread, time::Duration};
300 use tempdir::TempDir;
301
302 #[test]
303 fn test_event_stream_simple() {
304 for _ in 0..3 {
305 let dir = TempDir::new("test-event-stream").unwrap();
306 let path = dir.path().canonicalize().unwrap();
307 for i in 0..10 {
308 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
309 }
310 flush_historical_events();
311
312 let (tx, rx) = mpsc::channel();
313 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
314 thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
315
316 fs::write(path.join("new-file"), "").unwrap();
317 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
318 let event = events.last().unwrap();
319 assert_eq!(event.path, path.join("new-file"));
320 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
321
322 fs::remove_file(path.join("existing-file-5")).unwrap();
323 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
324 let event = events.last().unwrap();
325 assert_eq!(event.path, path.join("existing-file-5"));
326 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
327 drop(handle);
328 }
329 }
330
331 #[test]
332 fn test_event_stream_delayed_start() {
333 for _ in 0..3 {
334 let dir = TempDir::new("test-event-stream").unwrap();
335 let path = dir.path().canonicalize().unwrap();
336 for i in 0..10 {
337 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
338 }
339 flush_historical_events();
340
341 let (tx, rx) = mpsc::channel();
342 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
343
344 // Delay the call to `run` in order to make sure we don't miss any events that occur
345 // between creating the `EventStream` and calling `run`.
346 thread::spawn(move || {
347 thread::sleep(Duration::from_millis(100));
348 stream.run(move |events| tx.send(events.to_vec()).is_ok())
349 });
350
351 fs::write(path.join("new-file"), "").unwrap();
352 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
353 let event = events.last().unwrap();
354 assert_eq!(event.path, path.join("new-file"));
355 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
356
357 fs::remove_file(path.join("existing-file-5")).unwrap();
358 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
359 let event = events.last().unwrap();
360 assert_eq!(event.path, path.join("existing-file-5"));
361 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
362 drop(handle);
363 }
364 }
365
366 #[test]
367 fn test_event_stream_shutdown_by_dropping_handle() {
368 let dir = TempDir::new("test-event-stream").unwrap();
369 let path = dir.path().canonicalize().unwrap();
370 flush_historical_events();
371
372 let (tx, rx) = mpsc::channel();
373 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
374 thread::spawn(move || {
375 stream.run({
376 let tx = tx.clone();
377 move |_| {
378 tx.send("running").unwrap();
379 true
380 }
381 });
382 tx.send("stopped").unwrap();
383 });
384
385 fs::write(path.join("new-file"), "").unwrap();
386 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
387
388 // Dropping the handle causes `EventStream::run` to return.
389 drop(handle);
390 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
391 }
392
393 #[test]
394 fn test_event_stream_shutdown_before_run() {
395 let dir = TempDir::new("test-event-stream").unwrap();
396 let path = dir.path().canonicalize().unwrap();
397
398 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
399 drop(handle);
400
401 // This returns immediately because the handle was already dropped.
402 stream.run(|_| true);
403 }
404
405 fn flush_historical_events() {
406 let duration = if std::env::var("CI").is_ok() {
407 Duration::from_secs(2)
408 } else {
409 Duration::from_millis(500)
410 };
411 thread::sleep(duration);
412 }
413}