1use fsevent_sys::{self as fs, core_foundation as cf};
2use parking_lot::Mutex;
3use std::{
4 convert::AsRef,
5 ffi::{c_void, CStr, OsStr},
6 os::unix::ffi::OsStrExt,
7 path::{Path, PathBuf},
8 ptr, slice,
9 sync::Arc,
10 time::Duration,
11};
12
13use crate::{Event, StreamFlags};
14
15pub struct EventStream {
16 lifecycle: Arc<Mutex<Lifecycle>>,
17 state: Box<State>,
18}
19
20struct State {
21 latency: Duration,
22 paths: cf::CFMutableArrayRef,
23 callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
24 last_valid_event_id: Option<fs::FSEventStreamEventId>,
25 stream: fs::FSEventStreamRef,
26}
27
28impl Drop for State {
29 fn drop(&mut self) {
30 unsafe {
31 cf::CFRelease(self.paths);
32 fs::FSEventStreamStop(self.stream);
33 fs::FSEventStreamInvalidate(self.stream);
34 fs::FSEventStreamRelease(self.stream);
35 }
36 }
37}
38
39enum Lifecycle {
40 New,
41 Running(cf::CFRunLoopRef),
42 Stopped,
43}
44
45pub struct Handle(Arc<Mutex<Lifecycle>>);
46
47unsafe impl Send for EventStream {}
48unsafe impl Send for Lifecycle {}
49
50impl EventStream {
51 pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
52 unsafe {
53 let cf_paths =
54 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
55 assert!(!cf_paths.is_null());
56
57 for path in paths {
58 let path_bytes = path.as_os_str().as_bytes();
59 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
60 cf::kCFAllocatorDefault,
61 path_bytes.as_ptr() as *const i8,
62 path_bytes.len() as cf::CFIndex,
63 false,
64 );
65 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
66 cf::CFArrayAppendValue(cf_paths, cf_path);
67 cf::CFRelease(cf_path);
68 cf::CFRelease(cf_url);
69 }
70
71 let mut state = Box::new(State {
72 latency,
73 paths: cf_paths,
74 callback: None,
75 last_valid_event_id: None,
76 stream: ptr::null_mut(),
77 });
78 let stream_context = fs::FSEventStreamContext {
79 version: 0,
80 info: state.as_ref() as *const _ as *mut c_void,
81 retain: None,
82 release: None,
83 copy_description: None,
84 };
85 let stream = fs::FSEventStreamCreate(
86 cf::kCFAllocatorDefault,
87 Self::trampoline,
88 &stream_context,
89 cf_paths,
90 FSEventsGetCurrentEventId(),
91 latency.as_secs_f64(),
92 fs::kFSEventStreamCreateFlagFileEvents
93 | fs::kFSEventStreamCreateFlagNoDefer
94 | fs::kFSEventStreamCreateFlagWatchRoot,
95 );
96 state.stream = stream;
97
98 let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
99 (
100 EventStream {
101 lifecycle: lifecycle.clone(),
102 state,
103 },
104 Handle(lifecycle),
105 )
106 }
107 }
108
109 pub fn run<F>(mut self, f: F)
110 where
111 F: FnMut(Vec<Event>) -> bool + 'static,
112 {
113 self.state.callback = Some(Box::new(f));
114 unsafe {
115 let run_loop = cf::CFRunLoopGetCurrent();
116 {
117 let mut state = self.lifecycle.lock();
118 match *state {
119 Lifecycle::New => *state = Lifecycle::Running(run_loop),
120 Lifecycle::Running(_) => unreachable!(),
121 Lifecycle::Stopped => return,
122 }
123 }
124 fs::FSEventStreamScheduleWithRunLoop(
125 self.state.stream,
126 run_loop,
127 cf::kCFRunLoopDefaultMode,
128 );
129 fs::FSEventStreamStart(self.state.stream);
130 cf::CFRunLoopRun();
131 }
132 }
133
134 extern "C" fn trampoline(
135 stream_ref: fs::FSEventStreamRef,
136 info: *mut ::std::os::raw::c_void,
137 num: usize, // size_t numEvents
138 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
139 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
140 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
141 ) {
142 unsafe {
143 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
144 let e_ptr = event_flags as *mut u32;
145 let i_ptr = event_ids as *mut u64;
146 let state = (info as *mut State).as_mut().unwrap();
147 let callback = if let Some(callback) = state.callback.as_mut() {
148 callback
149 } else {
150 return;
151 };
152
153 let paths = slice::from_raw_parts(event_paths, num);
154 let flags = slice::from_raw_parts_mut(e_ptr, num);
155 let ids = slice::from_raw_parts_mut(i_ptr, num);
156 let mut stream_restarted = false;
157
158 // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
159 // or our code couldn't keep up with the sheer volume of file-system events that were
160 // generated. If we observed a valid event before this happens, we'll try to read the
161 // file-system journal by stopping the current stream and creating a new one starting at
162 // such event. Otherwise, we'll let invoke the callback with the dropped event, which
163 // will likely perform a re-scan of one of the root directories.
164 if flags
165 .iter()
166 .copied()
167 .filter_map(StreamFlags::from_bits)
168 .any(|flags| {
169 flags.contains(StreamFlags::USER_DROPPED)
170 || flags.contains(StreamFlags::KERNEL_DROPPED)
171 })
172 {
173 if let Some(last_valid_event_id) = state.last_valid_event_id.take() {
174 fs::FSEventStreamStop(state.stream);
175 fs::FSEventStreamInvalidate(state.stream);
176 fs::FSEventStreamRelease(state.stream);
177
178 let stream_context = fs::FSEventStreamContext {
179 version: 0,
180 info,
181 retain: None,
182 release: None,
183 copy_description: None,
184 };
185 let stream = fs::FSEventStreamCreate(
186 cf::kCFAllocatorDefault,
187 Self::trampoline,
188 &stream_context,
189 state.paths,
190 last_valid_event_id,
191 state.latency.as_secs_f64(),
192 fs::kFSEventStreamCreateFlagFileEvents
193 | fs::kFSEventStreamCreateFlagNoDefer
194 | fs::kFSEventStreamCreateFlagWatchRoot,
195 );
196
197 state.stream = stream;
198 fs::FSEventStreamScheduleWithRunLoop(
199 state.stream,
200 cf::CFRunLoopGetCurrent(),
201 cf::kCFRunLoopDefaultMode,
202 );
203 fs::FSEventStreamStart(state.stream);
204 stream_restarted = true;
205 }
206 }
207
208 if !stream_restarted {
209 let mut events = Vec::with_capacity(num);
210 for p in 0..num {
211 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
212 if !flag.contains(StreamFlags::HISTORY_DONE) {
213 let path_c_str = CStr::from_ptr(paths[p]);
214 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
215 let event = Event {
216 event_id: ids[p],
217 flags: flag,
218 path,
219 };
220 state.last_valid_event_id = Some(event.event_id);
221 events.push(event);
222 }
223 } else {
224 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
225 }
226 }
227
228 if !events.is_empty() && !callback(events) {
229 fs::FSEventStreamStop(stream_ref);
230 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
231 }
232 }
233 }
234 }
235}
236
237impl Drop for Handle {
238 fn drop(&mut self) {
239 let mut state = self.0.lock();
240 if let Lifecycle::Running(run_loop) = *state {
241 unsafe {
242 cf::CFRunLoopStop(run_loop);
243 }
244 }
245 *state = Lifecycle::Stopped;
246 }
247}
248
249#[link(name = "CoreServices", kind = "framework")]
250extern "C" {
251 pub fn FSEventsGetCurrentEventId() -> u64;
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::{fs, sync::mpsc, thread, time::Duration};
258
259 #[test]
260 fn test_event_stream_simple() {
261 for _ in 0..3 {
262 let dir = tempfile::Builder::new()
263 .prefix("test-event-stream")
264 .tempdir()
265 .unwrap();
266 let path = dir.path().canonicalize().unwrap();
267 for i in 0..10 {
268 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
269 }
270 flush_historical_events();
271
272 let (tx, rx) = mpsc::channel();
273 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
274 thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
275
276 fs::write(path.join("new-file"), "").unwrap();
277 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
278 let event = events.last().unwrap();
279 assert_eq!(event.path, path.join("new-file"));
280 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
281
282 fs::remove_file(path.join("existing-file-5")).unwrap();
283 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
284 let event = events.last().unwrap();
285 assert_eq!(event.path, path.join("existing-file-5"));
286 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
287 drop(handle);
288 }
289 }
290
291 #[test]
292 fn test_event_stream_delayed_start() {
293 for _ in 0..3 {
294 let dir = tempfile::Builder::new()
295 .prefix("test-event-stream")
296 .tempdir()
297 .unwrap();
298 let path = dir.path().canonicalize().unwrap();
299 for i in 0..10 {
300 fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
301 }
302 flush_historical_events();
303
304 let (tx, rx) = mpsc::channel();
305 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
306
307 // Delay the call to `run` in order to make sure we don't miss any events that occur
308 // between creating the `EventStream` and calling `run`.
309 thread::spawn(move || {
310 thread::sleep(Duration::from_millis(100));
311 stream.run(move |events| tx.send(events.to_vec()).is_ok())
312 });
313
314 fs::write(path.join("new-file"), "").unwrap();
315 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
316 let event = events.last().unwrap();
317 assert_eq!(event.path, path.join("new-file"));
318 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
319
320 fs::remove_file(path.join("existing-file-5")).unwrap();
321 let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
322 let event = events.last().unwrap();
323 assert_eq!(event.path, path.join("existing-file-5"));
324 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
325 drop(handle);
326 }
327 }
328
329 #[test]
330 fn test_event_stream_shutdown_by_dropping_handle() {
331 let dir = tempfile::Builder::new()
332 .prefix("test-event-stream")
333 .tempdir()
334 .unwrap();
335 let path = dir.path().canonicalize().unwrap();
336 flush_historical_events();
337
338 let (tx, rx) = mpsc::channel();
339 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
340 thread::spawn(move || {
341 stream.run({
342 let tx = tx.clone();
343 move |_| {
344 tx.send("running").unwrap();
345 true
346 }
347 });
348 tx.send("stopped").unwrap();
349 });
350
351 fs::write(path.join("new-file"), "").unwrap();
352 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
353
354 // Dropping the handle causes `EventStream::run` to return.
355 drop(handle);
356 assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
357 }
358
359 #[test]
360 fn test_event_stream_shutdown_before_run() {
361 let dir = tempfile::Builder::new()
362 .prefix("test-event-stream")
363 .tempdir()
364 .unwrap();
365 let path = dir.path().canonicalize().unwrap();
366
367 let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
368 drop(handle);
369
370 // This returns immediately because the handle was already dropped.
371 stream.run(|_| true);
372 }
373
374 fn flush_historical_events() {
375 let duration = if std::env::var("CI").is_ok() {
376 Duration::from_secs(2)
377 } else {
378 Duration::from_millis(500)
379 };
380 thread::sleep(duration);
381 }
382}