1#![cfg(target_os = "macos")]
2
3use bitflags::bitflags;
4use fsevent_sys::{self as fs, core_foundation as cf};
5use parking_lot::Mutex;
6use std::{
7 convert::AsRef,
8 ffi::{c_void, CStr, OsStr},
9 os::unix::ffi::OsStrExt,
10 path::{Path, PathBuf},
11 slice,
12 sync::Arc,
13 time::Duration,
14};
15
16#[derive(Clone, Debug)]
17pub struct Event {
18 pub event_id: u64,
19 pub flags: StreamFlags,
20 pub path: PathBuf,
21}
22
23pub struct EventStream {
24 stream: fs::FSEventStreamRef,
25 lifecycle: Arc<Mutex<Lifecycle>>,
26 state: Box<State>,
27}
28
29struct State {
30 latency: Duration,
31 paths: cf::CFMutableArrayRef,
32 callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
33 last_valid_event_id: Option<fs::FSEventStreamEventId>,
34}
35
36impl Drop for State {
37 fn drop(&mut self) {
38 unsafe {
39 cf::CFRelease(self.paths);
40 }
41 }
42}
43
44enum Lifecycle {
45 New,
46 Running(cf::CFRunLoopRef),
47 Stopped,
48}
49
50pub struct Handle(Arc<Mutex<Lifecycle>>);
51
52unsafe impl Send for EventStream {}
53unsafe impl Send for Lifecycle {}
54
55impl EventStream {
56 pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
57 unsafe {
58 let cf_paths =
59 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
60 assert!(!cf_paths.is_null());
61
62 for path in paths {
63 let path_bytes = path.as_os_str().as_bytes();
64 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
65 cf::kCFAllocatorDefault,
66 path_bytes.as_ptr() as *const i8,
67 path_bytes.len() as cf::CFIndex,
68 false,
69 );
70 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
71 cf::CFArrayAppendValue(cf_paths, cf_path);
72 cf::CFRelease(cf_path);
73 cf::CFRelease(cf_url);
74 }
75
76 let state = Box::new(State {
77 latency,
78 paths: cf_paths,
79 callback: None,
80 last_valid_event_id: None,
81 });
82 let stream_context = fs::FSEventStreamContext {
83 version: 0,
84 info: state.as_ref() as *const _ as *mut c_void,
85 retain: None,
86 release: None,
87 copy_description: None,
88 };
89 let stream = fs::FSEventStreamCreate(
90 cf::kCFAllocatorDefault,
91 Self::trampoline,
92 &stream_context,
93 cf_paths,
94 FSEventsGetCurrentEventId(),
95 latency.as_secs_f64(),
96 fs::kFSEventStreamCreateFlagFileEvents
97 | fs::kFSEventStreamCreateFlagNoDefer
98 | fs::kFSEventStreamCreateFlagWatchRoot,
99 );
100
101 let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
102 (
103 EventStream {
104 stream,
105 lifecycle: lifecycle.clone(),
106 state,
107 },
108 Handle(lifecycle),
109 )
110 }
111 }
112
113 pub fn run<F>(mut self, f: F)
114 where
115 F: FnMut(Vec<Event>) -> bool + 'static,
116 {
117 self.state.callback = Some(Box::new(f));
118 unsafe {
119 let run_loop = cf::CFRunLoopGetCurrent();
120 {
121 let mut state = self.lifecycle.lock();
122 match *state {
123 Lifecycle::New => *state = Lifecycle::Running(run_loop),
124 Lifecycle::Running(_) => unreachable!(),
125 Lifecycle::Stopped => return,
126 }
127 }
128 fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
129 fs::FSEventStreamStart(self.stream);
130 cf::CFRunLoopRun();
131 fs::FSEventStreamRelease(self.stream);
132 }
133 }
134
135 extern "C" fn trampoline(
136 stream_ref: fs::FSEventStreamRef,
137 info: *mut ::std::os::raw::c_void,
138 num: usize, // size_t numEvents
139 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
140 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
141 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
142 ) {
143 unsafe {
144 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
145 let e_ptr = event_flags as *mut u32;
146 let i_ptr = event_ids as *mut u64;
147 let state = (info as *mut State).as_mut().unwrap();
148 let callback = if let Some(callback) = state.callback.as_mut() {
149 callback
150 } else {
151 return;
152 };
153
154 let paths = slice::from_raw_parts(event_paths, num);
155 let flags = slice::from_raw_parts_mut(e_ptr, num);
156 let ids = slice::from_raw_parts_mut(i_ptr, num);
157
158 let mut events = Vec::with_capacity(num);
159 for p in 0..num {
160 let path_c_str = CStr::from_ptr(paths[p]);
161 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
162 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
163 if flag.contains(StreamFlags::HISTORY_DONE) {
164 events.clear();
165 } else {
166 events.push(Event {
167 event_id: ids[p],
168 flags: flag,
169 path,
170 });
171 }
172 } else {
173 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
174 }
175 }
176
177 if !events.is_empty() {
178 if !callback(events) {
179 fs::FSEventStreamStop(stream_ref);
180 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
181 }
182 }
183 }
184 }
185}
186
187impl Drop for Handle {
188 fn drop(&mut self) {
189 let mut state = self.0.lock();
190 if let Lifecycle::Running(run_loop) = *state {
191 unsafe {
192 cf::CFRunLoopStop(run_loop);
193 }
194 }
195 *state = Lifecycle::Stopped;
196 }
197}
198
199// Synchronize with
200// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
201bitflags! {
202 #[repr(C)]
203 pub struct StreamFlags: u32 {
204 const NONE = 0x00000000;
205 const MUST_SCAN_SUBDIRS = 0x00000001;
206 const USER_DROPPED = 0x00000002;
207 const KERNEL_DROPPED = 0x00000004;
208 const IDS_WRAPPED = 0x00000008;
209 const HISTORY_DONE = 0x00000010;
210 const ROOT_CHANGED = 0x00000020;
211 const MOUNT = 0x00000040;
212 const UNMOUNT = 0x00000080;
213 const ITEM_CREATED = 0x00000100;
214 const ITEM_REMOVED = 0x00000200;
215 const INODE_META_MOD = 0x00000400;
216 const ITEM_RENAMED = 0x00000800;
217 const ITEM_MODIFIED = 0x00001000;
218 const FINDER_INFO_MOD = 0x00002000;
219 const ITEM_CHANGE_OWNER = 0x00004000;
220 const ITEM_XATTR_MOD = 0x00008000;
221 const IS_FILE = 0x00010000;
222 const IS_DIR = 0x00020000;
223 const IS_SYMLINK = 0x00040000;
224 const OWN_EVENT = 0x00080000;
225 const IS_HARDLINK = 0x00100000;
226 const IS_LAST_HARDLINK = 0x00200000;
227 const ITEM_CLONED = 0x400000;
228 }
229}
230
231impl std::fmt::Display for StreamFlags {
232 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
233 if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
234 let _d = write!(f, "MUST_SCAN_SUBDIRS ");
235 }
236 if self.contains(StreamFlags::USER_DROPPED) {
237 let _d = write!(f, "USER_DROPPED ");
238 }
239 if self.contains(StreamFlags::KERNEL_DROPPED) {
240 let _d = write!(f, "KERNEL_DROPPED ");
241 }
242 if self.contains(StreamFlags::IDS_WRAPPED) {
243 let _d = write!(f, "IDS_WRAPPED ");
244 }
245 if self.contains(StreamFlags::HISTORY_DONE) {
246 let _d = write!(f, "HISTORY_DONE ");
247 }
248 if self.contains(StreamFlags::ROOT_CHANGED) {
249 let _d = write!(f, "ROOT_CHANGED ");
250 }
251 if self.contains(StreamFlags::MOUNT) {
252 let _d = write!(f, "MOUNT ");
253 }
254 if self.contains(StreamFlags::UNMOUNT) {
255 let _d = write!(f, "UNMOUNT ");
256 }
257 if self.contains(StreamFlags::ITEM_CREATED) {
258 let _d = write!(f, "ITEM_CREATED ");
259 }
260 if self.contains(StreamFlags::ITEM_REMOVED) {
261 let _d = write!(f, "ITEM_REMOVED ");
262 }
263 if self.contains(StreamFlags::INODE_META_MOD) {
264 let _d = write!(f, "INODE_META_MOD ");
265 }
266 if self.contains(StreamFlags::ITEM_RENAMED) {
267 let _d = write!(f, "ITEM_RENAMED ");
268 }
269 if self.contains(StreamFlags::ITEM_MODIFIED) {
270 let _d = write!(f, "ITEM_MODIFIED ");
271 }
272 if self.contains(StreamFlags::FINDER_INFO_MOD) {
273 let _d = write!(f, "FINDER_INFO_MOD ");
274 }
275 if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
276 let _d = write!(f, "ITEM_CHANGE_OWNER ");
277 }
278 if self.contains(StreamFlags::ITEM_XATTR_MOD) {
279 let _d = write!(f, "ITEM_XATTR_MOD ");
280 }
281 if self.contains(StreamFlags::IS_FILE) {
282 let _d = write!(f, "IS_FILE ");
283 }
284 if self.contains(StreamFlags::IS_DIR) {
285 let _d = write!(f, "IS_DIR ");
286 }
287 if self.contains(StreamFlags::IS_SYMLINK) {
288 let _d = write!(f, "IS_SYMLINK ");
289 }
290 if self.contains(StreamFlags::OWN_EVENT) {
291 let _d = write!(f, "OWN_EVENT ");
292 }
293 if self.contains(StreamFlags::IS_LAST_HARDLINK) {
294 let _d = write!(f, "IS_LAST_HARDLINK ");
295 }
296 if self.contains(StreamFlags::IS_HARDLINK) {
297 let _d = write!(f, "IS_HARDLINK ");
298 }
299 if self.contains(StreamFlags::ITEM_CLONED) {
300 let _d = write!(f, "ITEM_CLONED ");
301 }
302 write!(f, "")
303 }
304}
305
306#[link(name = "CoreServices", kind = "framework")]
307extern "C" {
308 pub fn FSEventsGetCurrentEventId() -> u64;
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use std::{fs, sync::mpsc, thread, time::Duration};
315 use tempdir::TempDir;
316
317 #[test]
318 fn test_event_stream_simple() {
319 for _ in 0..3 {
320 let dir = TempDir::new("test-event-stream").unwrap();
321 let path = dir.path().canonicalize().unwrap();
322 for i in 0..10 {
323 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
324 }
325 flush_historical_events();
326
327 let (tx, rx) = mpsc::channel();
328 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
329 thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
330
331 fs::write(path.join("new-file"), "").unwrap();
332 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
333 let event = events.last().unwrap();
334 assert_eq!(event.path, path.join("new-file"));
335 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
336
337 fs::remove_file(path.join("existing-file-5")).unwrap();
338 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
339 let event = events.last().unwrap();
340 assert_eq!(event.path, path.join("existing-file-5"));
341 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
342 drop(handle);
343 }
344 }
345
346 #[test]
347 fn test_event_stream_delayed_start() {
348 for _ in 0..3 {
349 let dir = TempDir::new("test-event-stream").unwrap();
350 let path = dir.path().canonicalize().unwrap();
351 for i in 0..10 {
352 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
353 }
354 flush_historical_events();
355
356 let (tx, rx) = mpsc::channel();
357 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
358
359 // Delay the call to `run` in order to make sure we don't miss any events that occur
360 // between creating the `EventStream` and calling `run`.
361 thread::spawn(move || {
362 thread::sleep(Duration::from_millis(100));
363 stream.run(move |events| tx.send(events.to_vec()).is_ok())
364 });
365
366 fs::write(path.join("new-file"), "").unwrap();
367 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
368 let event = events.last().unwrap();
369 assert_eq!(event.path, path.join("new-file"));
370 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
371
372 fs::remove_file(path.join("existing-file-5")).unwrap();
373 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
374 let event = events.last().unwrap();
375 assert_eq!(event.path, path.join("existing-file-5"));
376 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
377 drop(handle);
378 }
379 }
380
381 #[test]
382 fn test_event_stream_shutdown_by_dropping_handle() {
383 let dir = TempDir::new("test-event-stream").unwrap();
384 let path = dir.path().canonicalize().unwrap();
385 flush_historical_events();
386
387 let (tx, rx) = mpsc::channel();
388 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
389 thread::spawn(move || {
390 stream.run({
391 let tx = tx.clone();
392 move |_| {
393 tx.send("running").unwrap();
394 true
395 }
396 });
397 tx.send("stopped").unwrap();
398 });
399
400 fs::write(path.join("new-file"), "").unwrap();
401 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
402
403 // Dropping the handle causes `EventStream::run` to return.
404 drop(handle);
405 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
406 }
407
408 #[test]
409 fn test_event_stream_shutdown_before_run() {
410 let dir = TempDir::new("test-event-stream").unwrap();
411 let path = dir.path().canonicalize().unwrap();
412
413 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
414 drop(handle);
415
416 // This returns immediately because the handle was already dropped.
417 stream.run(|_| true);
418 }
419
420 fn flush_historical_events() {
421 let duration = if std::env::var("CI").is_ok() {
422 Duration::from_secs(2)
423 } else {
424 Duration::from_millis(500)
425 };
426 thread::sleep(duration);
427 }
428}