mac_impl.rs

  1use fsevent_sys::{self as fs, core_foundation as cf};
  2use parking_lot::Mutex;
  3use std::{
  4    convert::AsRef,
  5    ffi::{c_void, CStr, OsStr},
  6    os::unix::ffi::OsStrExt,
  7    path::{Path, PathBuf},
  8    ptr, slice,
  9    sync::Arc,
 10    time::Duration,
 11};
 12
 13use crate::{Event, StreamFlags};
 14
 15pub struct EventStream {
 16    lifecycle: Arc<Mutex<Lifecycle>>,
 17    state: Box<State>,
 18}
 19
 20struct State {
 21    latency: Duration,
 22    paths: cf::CFMutableArrayRef,
 23    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
 24    last_valid_event_id: Option<fs::FSEventStreamEventId>,
 25    stream: fs::FSEventStreamRef,
 26}
 27
 28impl Drop for State {
 29    fn drop(&mut self) {
 30        unsafe {
 31            cf::CFRelease(self.paths);
 32            fs::FSEventStreamStop(self.stream);
 33            fs::FSEventStreamInvalidate(self.stream);
 34            fs::FSEventStreamRelease(self.stream);
 35        }
 36    }
 37}
 38
 39enum Lifecycle {
 40    New,
 41    Running(cf::CFRunLoopRef),
 42    Stopped,
 43}
 44
 45pub struct Handle(Arc<Mutex<Lifecycle>>);
 46
 47unsafe impl Send for EventStream {}
 48unsafe impl Send for Lifecycle {}
 49
 50impl EventStream {
 51    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
 52        unsafe {
 53            let cf_paths =
 54                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
 55            assert!(!cf_paths.is_null());
 56
 57            for path in paths {
 58                let path_bytes = path.as_os_str().as_bytes();
 59                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
 60                    cf::kCFAllocatorDefault,
 61                    path_bytes.as_ptr() as *const i8,
 62                    path_bytes.len() as cf::CFIndex,
 63                    false,
 64                );
 65                let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
 66                cf::CFArrayAppendValue(cf_paths, cf_path);
 67                cf::CFRelease(cf_path);
 68                cf::CFRelease(cf_url);
 69            }
 70
 71            let mut state = Box::new(State {
 72                latency,
 73                paths: cf_paths,
 74                callback: None,
 75                last_valid_event_id: None,
 76                stream: ptr::null_mut(),
 77            });
 78            let stream_context = fs::FSEventStreamContext {
 79                version: 0,
 80                info: state.as_ref() as *const _ as *mut c_void,
 81                retain: None,
 82                release: None,
 83                copy_description: None,
 84            };
 85            let stream = fs::FSEventStreamCreate(
 86                cf::kCFAllocatorDefault,
 87                Self::trampoline,
 88                &stream_context,
 89                cf_paths,
 90                FSEventsGetCurrentEventId(),
 91                latency.as_secs_f64(),
 92                fs::kFSEventStreamCreateFlagFileEvents
 93                    | fs::kFSEventStreamCreateFlagNoDefer
 94                    | fs::kFSEventStreamCreateFlagWatchRoot,
 95            );
 96            state.stream = stream;
 97
 98            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
 99            (
100                EventStream {
101                    lifecycle: lifecycle.clone(),
102                    state,
103                },
104                Handle(lifecycle),
105            )
106        }
107    }
108
109    pub fn run<F>(mut self, f: F)
110    where
111        F: FnMut(Vec<Event>) -> bool + 'static,
112    {
113        self.state.callback = Some(Box::new(f));
114        unsafe {
115            let run_loop = cf::CFRunLoopGetCurrent();
116            {
117                let mut state = self.lifecycle.lock();
118                match *state {
119                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
120                    Lifecycle::Running(_) => unreachable!(),
121                    Lifecycle::Stopped => return,
122                }
123            }
124            fs::FSEventStreamScheduleWithRunLoop(
125                self.state.stream,
126                run_loop,
127                cf::kCFRunLoopDefaultMode,
128            );
129            fs::FSEventStreamStart(self.state.stream);
130            cf::CFRunLoopRun();
131        }
132    }
133
134    extern "C" fn trampoline(
135        stream_ref: fs::FSEventStreamRef,
136        info: *mut ::std::os::raw::c_void,
137        num: usize,                                 // size_t numEvents
138        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
139        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
140        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
141    ) {
142        unsafe {
143            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
144            let e_ptr = event_flags as *mut u32;
145            let i_ptr = event_ids as *mut u64;
146            let state = (info as *mut State).as_mut().unwrap();
147            let callback = if let Some(callback) = state.callback.as_mut() {
148                callback
149            } else {
150                return;
151            };
152
153            let paths = slice::from_raw_parts(event_paths, num);
154            let flags = slice::from_raw_parts_mut(e_ptr, num);
155            let ids = slice::from_raw_parts_mut(i_ptr, num);
156            let mut stream_restarted = false;
157
158            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
159            // or our code couldn't keep up with the sheer volume of file-system events that were
160            // generated. If we observed a valid event before this happens, we'll try to read the
161            // file-system journal by stopping the current stream and creating a new one starting at
162            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
163            // will likely perform a re-scan of one of the root directories.
164            if flags
165                .iter()
166                .copied()
167                .filter_map(StreamFlags::from_bits)
168                .any(|flags| {
169                    flags.contains(StreamFlags::USER_DROPPED)
170                        || flags.contains(StreamFlags::KERNEL_DROPPED)
171                })
172            {
173                if let Some(last_valid_event_id) = state.last_valid_event_id.take() {
174                    fs::FSEventStreamStop(state.stream);
175                    fs::FSEventStreamInvalidate(state.stream);
176                    fs::FSEventStreamRelease(state.stream);
177
178                    let stream_context = fs::FSEventStreamContext {
179                        version: 0,
180                        info,
181                        retain: None,
182                        release: None,
183                        copy_description: None,
184                    };
185                    let stream = fs::FSEventStreamCreate(
186                        cf::kCFAllocatorDefault,
187                        Self::trampoline,
188                        &stream_context,
189                        state.paths,
190                        last_valid_event_id,
191                        state.latency.as_secs_f64(),
192                        fs::kFSEventStreamCreateFlagFileEvents
193                            | fs::kFSEventStreamCreateFlagNoDefer
194                            | fs::kFSEventStreamCreateFlagWatchRoot,
195                    );
196
197                    state.stream = stream;
198                    fs::FSEventStreamScheduleWithRunLoop(
199                        state.stream,
200                        cf::CFRunLoopGetCurrent(),
201                        cf::kCFRunLoopDefaultMode,
202                    );
203                    fs::FSEventStreamStart(state.stream);
204                    stream_restarted = true;
205                }
206            }
207
208            if !stream_restarted {
209                let mut events = Vec::with_capacity(num);
210                for p in 0..num {
211                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
212                        if !flag.contains(StreamFlags::HISTORY_DONE) {
213                            let path_c_str = CStr::from_ptr(paths[p]);
214                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
215                            let event = Event {
216                                event_id: ids[p],
217                                flags: flag,
218                                path,
219                            };
220                            state.last_valid_event_id = Some(event.event_id);
221                            events.push(event);
222                        }
223                    } else {
224                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
225                    }
226                }
227
228                if !events.is_empty() && !callback(events) {
229                    fs::FSEventStreamStop(stream_ref);
230                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
231                }
232            }
233        }
234    }
235}
236
237impl Drop for Handle {
238    fn drop(&mut self) {
239        let mut state = self.0.lock();
240        if let Lifecycle::Running(run_loop) = *state {
241            unsafe {
242                cf::CFRunLoopStop(run_loop);
243            }
244        }
245        *state = Lifecycle::Stopped;
246    }
247}
248
249#[link(name = "CoreServices", kind = "framework")]
250extern "C" {
251    pub fn FSEventsGetCurrentEventId() -> u64;
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::{fs, sync::mpsc, thread, time::Duration};
258
259    #[test]
260    fn test_event_stream_simple() {
261        for _ in 0..3 {
262            let dir = tempfile::Builder::new()
263                .prefix("test-event-stream")
264                .tempdir()
265                .unwrap();
266            let path = dir.path().canonicalize().unwrap();
267            for i in 0..10 {
268                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
269            }
270            flush_historical_events();
271
272            let (tx, rx) = mpsc::channel();
273            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
274            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
275
276            fs::write(path.join("new-file"), "").unwrap();
277            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
278            let event = events.last().unwrap();
279            assert_eq!(event.path, path.join("new-file"));
280            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
281
282            fs::remove_file(path.join("existing-file-5")).unwrap();
283            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
284            let event = events.last().unwrap();
285            assert_eq!(event.path, path.join("existing-file-5"));
286            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
287            drop(handle);
288        }
289    }
290
291    #[test]
292    fn test_event_stream_delayed_start() {
293        for _ in 0..3 {
294            let dir = tempfile::Builder::new()
295                .prefix("test-event-stream")
296                .tempdir()
297                .unwrap();
298            let path = dir.path().canonicalize().unwrap();
299            for i in 0..10 {
300                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
301            }
302            flush_historical_events();
303
304            let (tx, rx) = mpsc::channel();
305            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
306
307            // Delay the call to `run` in order to make sure we don't miss any events that occur
308            // between creating the `EventStream` and calling `run`.
309            thread::spawn(move || {
310                thread::sleep(Duration::from_millis(100));
311                stream.run(move |events| tx.send(events.to_vec()).is_ok())
312            });
313
314            fs::write(path.join("new-file"), "").unwrap();
315            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
316            let event = events.last().unwrap();
317            assert_eq!(event.path, path.join("new-file"));
318            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
319
320            fs::remove_file(path.join("existing-file-5")).unwrap();
321            let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
322            let event = events.last().unwrap();
323            assert_eq!(event.path, path.join("existing-file-5"));
324            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
325            drop(handle);
326        }
327    }
328
329    #[test]
330    fn test_event_stream_shutdown_by_dropping_handle() {
331        let dir = tempfile::Builder::new()
332            .prefix("test-event-stream")
333            .tempdir()
334            .unwrap();
335        let path = dir.path().canonicalize().unwrap();
336        flush_historical_events();
337
338        let (tx, rx) = mpsc::channel();
339        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
340        thread::spawn(move || {
341            stream.run({
342                let tx = tx.clone();
343                move |_| {
344                    tx.send("running").unwrap();
345                    true
346                }
347            });
348            tx.send("stopped").unwrap();
349        });
350
351        fs::write(path.join("new-file"), "").unwrap();
352        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
353
354        // Dropping the handle causes `EventStream::run` to return.
355        drop(handle);
356        assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
357    }
358
359    #[test]
360    fn test_event_stream_shutdown_before_run() {
361        let dir = tempfile::Builder::new()
362            .prefix("test-event-stream")
363            .tempdir()
364            .unwrap();
365        let path = dir.path().canonicalize().unwrap();
366
367        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
368        drop(handle);
369
370        // This returns immediately because the handle was already dropped.
371        stream.run(|_| true);
372    }
373
374    fn flush_historical_events() {
375        let duration = if std::env::var("CI").is_ok() {
376            Duration::from_secs(2)
377        } else {
378            Duration::from_millis(500)
379        };
380        thread::sleep(duration);
381    }
382}