1#![cfg(target_os = "macos")]
2
3use bitflags::bitflags;
4use fsevent_sys::{self as fs, core_foundation as cf};
5use std::{
6 convert::AsRef,
7 ffi::{c_void, CStr, OsStr},
8 os::unix::ffi::OsStrExt,
9 path::{Path, PathBuf},
10 slice,
11 time::Duration,
12};
13
14#[derive(Clone, Debug)]
15pub struct Event {
16 pub event_id: u64,
17 pub flags: StreamFlags,
18 pub path: PathBuf,
19}
20
21pub struct EventStream<F> {
22 stream: fs::FSEventStreamRef,
23 _callback: Box<F>,
24}
25
26unsafe impl<F> Send for EventStream<F> {}
27
28impl<F> EventStream<F>
29where
30 F: FnMut(Vec<Event>) -> bool,
31{
32 pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self {
33 unsafe {
34 let callback = Box::new(callback);
35 let stream_context = fs::FSEventStreamContext {
36 version: 0,
37 info: callback.as_ref() as *const _ as *mut c_void,
38 retain: None,
39 release: None,
40 copy_description: None,
41 };
42
43 let cf_paths =
44 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
45 assert!(!cf_paths.is_null());
46
47 for path in paths {
48 let path_bytes = path.as_os_str().as_bytes();
49 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
50 cf::kCFAllocatorDefault,
51 path_bytes.as_ptr() as *const i8,
52 path_bytes.len() as cf::CFIndex,
53 false,
54 );
55 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
56 cf::CFArrayAppendValue(cf_paths, cf_path);
57 cf::CFRelease(cf_path);
58 cf::CFRelease(cf_url);
59 }
60
61 let stream = fs::FSEventStreamCreate(
62 cf::kCFAllocatorDefault,
63 Self::trampoline,
64 &stream_context,
65 cf_paths,
66 fs::kFSEventStreamEventIdSinceNow,
67 latency.as_secs_f64(),
68 fs::kFSEventStreamCreateFlagFileEvents
69 | fs::kFSEventStreamCreateFlagNoDefer
70 | fs::kFSEventStreamCreateFlagWatchRoot,
71 );
72 cf::CFRelease(cf_paths);
73
74 EventStream {
75 stream,
76 _callback: callback,
77 }
78 }
79 }
80
81 pub fn run(self) {
82 unsafe {
83 fs::FSEventStreamScheduleWithRunLoop(
84 self.stream,
85 cf::CFRunLoopGetCurrent(),
86 cf::kCFRunLoopDefaultMode,
87 );
88
89 fs::FSEventStreamStart(self.stream);
90 cf::CFRunLoopRun();
91
92 fs::FSEventStreamFlushSync(self.stream);
93 fs::FSEventStreamStop(self.stream);
94 fs::FSEventStreamRelease(self.stream);
95 }
96 }
97
98 extern "C" fn trampoline(
99 stream_ref: fs::FSEventStreamRef,
100 info: *mut ::std::os::raw::c_void,
101 num: usize, // size_t numEvents
102 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
103 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
104 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
105 ) {
106 unsafe {
107 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
108 let e_ptr = event_flags as *mut u32;
109 let i_ptr = event_ids as *mut u64;
110 let callback = (info as *mut F).as_mut().unwrap();
111
112 let paths = slice::from_raw_parts(event_paths, num);
113 let flags = slice::from_raw_parts_mut(e_ptr, num);
114 let ids = slice::from_raw_parts_mut(i_ptr, num);
115
116 let mut events = Vec::with_capacity(num);
117 for p in 0..num {
118 let path_c_str = CStr::from_ptr(paths[p]);
119 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
120 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
121 events.push(Event {
122 event_id: ids[p],
123 flags: flag,
124 path,
125 });
126 } else {
127 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
128 }
129 }
130
131 if !callback(events) {
132 fs::FSEventStreamStop(stream_ref);
133 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
134 }
135 }
136 }
137}
138
139// Synchronize with
140// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
141bitflags! {
142 #[repr(C)]
143 pub struct StreamFlags: u32 {
144 const NONE = 0x00000000;
145 const MUST_SCAN_SUBDIRS = 0x00000001;
146 const USER_DROPPED = 0x00000002;
147 const KERNEL_DROPPED = 0x00000004;
148 const IDS_WRAPPED = 0x00000008;
149 const HISTORY_DONE = 0x00000010;
150 const ROOT_CHANGED = 0x00000020;
151 const MOUNT = 0x00000040;
152 const UNMOUNT = 0x00000080;
153 const ITEM_CREATED = 0x00000100;
154 const ITEM_REMOVED = 0x00000200;
155 const INODE_META_MOD = 0x00000400;
156 const ITEM_RENAMED = 0x00000800;
157 const ITEM_MODIFIED = 0x00001000;
158 const FINDER_INFO_MOD = 0x00002000;
159 const ITEM_CHANGE_OWNER = 0x00004000;
160 const ITEM_XATTR_MOD = 0x00008000;
161 const IS_FILE = 0x00010000;
162 const IS_DIR = 0x00020000;
163 const IS_SYMLINK = 0x00040000;
164 const OWN_EVENT = 0x00080000;
165 const IS_HARDLINK = 0x00100000;
166 const IS_LAST_HARDLINK = 0x00200000;
167 const ITEM_CLONED = 0x400000;
168 }
169}
170
171impl std::fmt::Display for StreamFlags {
172 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173 if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
174 let _d = write!(f, "MUST_SCAN_SUBDIRS ");
175 }
176 if self.contains(StreamFlags::USER_DROPPED) {
177 let _d = write!(f, "USER_DROPPED ");
178 }
179 if self.contains(StreamFlags::KERNEL_DROPPED) {
180 let _d = write!(f, "KERNEL_DROPPED ");
181 }
182 if self.contains(StreamFlags::IDS_WRAPPED) {
183 let _d = write!(f, "IDS_WRAPPED ");
184 }
185 if self.contains(StreamFlags::HISTORY_DONE) {
186 let _d = write!(f, "HISTORY_DONE ");
187 }
188 if self.contains(StreamFlags::ROOT_CHANGED) {
189 let _d = write!(f, "ROOT_CHANGED ");
190 }
191 if self.contains(StreamFlags::MOUNT) {
192 let _d = write!(f, "MOUNT ");
193 }
194 if self.contains(StreamFlags::UNMOUNT) {
195 let _d = write!(f, "UNMOUNT ");
196 }
197 if self.contains(StreamFlags::ITEM_CREATED) {
198 let _d = write!(f, "ITEM_CREATED ");
199 }
200 if self.contains(StreamFlags::ITEM_REMOVED) {
201 let _d = write!(f, "ITEM_REMOVED ");
202 }
203 if self.contains(StreamFlags::INODE_META_MOD) {
204 let _d = write!(f, "INODE_META_MOD ");
205 }
206 if self.contains(StreamFlags::ITEM_RENAMED) {
207 let _d = write!(f, "ITEM_RENAMED ");
208 }
209 if self.contains(StreamFlags::ITEM_MODIFIED) {
210 let _d = write!(f, "ITEM_MODIFIED ");
211 }
212 if self.contains(StreamFlags::FINDER_INFO_MOD) {
213 let _d = write!(f, "FINDER_INFO_MOD ");
214 }
215 if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
216 let _d = write!(f, "ITEM_CHANGE_OWNER ");
217 }
218 if self.contains(StreamFlags::ITEM_XATTR_MOD) {
219 let _d = write!(f, "ITEM_XATTR_MOD ");
220 }
221 if self.contains(StreamFlags::IS_FILE) {
222 let _d = write!(f, "IS_FILE ");
223 }
224 if self.contains(StreamFlags::IS_DIR) {
225 let _d = write!(f, "IS_DIR ");
226 }
227 if self.contains(StreamFlags::IS_SYMLINK) {
228 let _d = write!(f, "IS_SYMLINK ");
229 }
230 if self.contains(StreamFlags::OWN_EVENT) {
231 let _d = write!(f, "OWN_EVENT ");
232 }
233 if self.contains(StreamFlags::IS_LAST_HARDLINK) {
234 let _d = write!(f, "IS_LAST_HARDLINK ");
235 }
236 if self.contains(StreamFlags::IS_HARDLINK) {
237 let _d = write!(f, "IS_HARDLINK ");
238 }
239 if self.contains(StreamFlags::ITEM_CLONED) {
240 let _d = write!(f, "ITEM_CLONED ");
241 }
242 write!(f, "")
243 }
244}
245
246#[test]
247fn test_event_stream() {
248 use std::{fs, sync::mpsc, time::Duration};
249 use tempdir::TempDir;
250
251 let dir = TempDir::new("test_observe").unwrap();
252 let path = dir.path().canonicalize().unwrap();
253 fs::write(path.join("a"), "a contents").unwrap();
254
255 let (tx, rx) = mpsc::channel();
256 let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| {
257 tx.send(events.to_vec()).is_ok()
258 });
259 std::thread::spawn(move || stream.run());
260
261 fs::write(path.join("b"), "b contents").unwrap();
262 let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
263 let event = events.last().unwrap();
264 assert_eq!(event.path, path.join("b"));
265 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
266
267 fs::remove_file(path.join("a")).unwrap();
268 let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
269 let event = events.last().unwrap();
270 assert_eq!(event.path, path.join("a"));
271 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
272}