diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index 95171f835c7d4adef52dece7b740cdbfb9e0f4cd..4b4d3766bd60f77e1c800eccdbb9fbe5aeb8de6b 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -8,7 +8,7 @@ use std::{ ffi::{c_void, CStr, OsStr}, os::unix::ffi::OsStrExt, path::{Path, PathBuf}, - slice, + ptr, slice, sync::Arc, time::Duration, }; @@ -21,12 +21,28 @@ pub struct Event { } pub struct EventStream { + lifecycle: Arc>, + state: Box, +} + +struct State { + latency: Duration, + paths: cf::CFMutableArrayRef, + callback: Option) -> bool>>, + last_valid_event_id: Option, stream: fs::FSEventStreamRef, - state: Arc>, - callback: Box>, } -type RunCallback = Box) -> bool>; +impl Drop for State { + fn drop(&mut self) { + unsafe { + cf::CFRelease(self.paths); + fs::FSEventStreamStop(self.stream); + fs::FSEventStreamInvalidate(self.stream); + fs::FSEventStreamRelease(self.stream); + } + } +} enum Lifecycle { New, @@ -42,15 +58,6 @@ unsafe impl Send for Lifecycle {} impl EventStream { pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { unsafe { - let callback = Box::new(None); - let stream_context = fs::FSEventStreamContext { - version: 0, - info: callback.as_ref() as *const _ as *mut c_void, - retain: None, - release: None, - copy_description: None, - }; - let cf_paths = cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); assert!(!cf_paths.is_null()); @@ -69,6 +76,20 @@ impl EventStream { cf::CFRelease(cf_url); } + let mut state = Box::new(State { + latency, + paths: cf_paths, + callback: None, + last_valid_event_id: None, + stream: ptr::null_mut(), + }); + let stream_context = fs::FSEventStreamContext { + version: 0, + info: state.as_ref() as *const _ as *mut c_void, + retain: None, + release: None, + copy_description: None, + }; let stream = fs::FSEventStreamCreate( cf::kCFAllocatorDefault, Self::trampoline, @@ -80,17 +101,15 @@ impl EventStream { | fs::kFSEventStreamCreateFlagNoDefer | fs::kFSEventStreamCreateFlagWatchRoot, ); - cf::CFRelease(cf_paths); - - let state = Arc::new(Mutex::new(Lifecycle::New)); + state.stream = stream; + let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); ( EventStream { - stream, - state: state.clone(), - callback, + lifecycle: lifecycle.clone(), + state, }, - Handle(state), + Handle(lifecycle), ) } } @@ -99,21 +118,24 @@ impl EventStream { where F: FnMut(Vec) -> bool + 'static, { - *self.callback = Some(Box::new(f)); + self.state.callback = Some(Box::new(f)); unsafe { let run_loop = cf::CFRunLoopGetCurrent(); { - let mut state = self.state.lock(); + let mut state = self.lifecycle.lock(); match *state { Lifecycle::New => *state = Lifecycle::Running(run_loop), Lifecycle::Running(_) => unreachable!(), Lifecycle::Stopped => return, } } - fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode); - fs::FSEventStreamStart(self.stream); + fs::FSEventStreamScheduleWithRunLoop( + self.state.stream, + run_loop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(self.state.stream); cf::CFRunLoopRun(); - fs::FSEventStreamRelease(self.stream); } } @@ -129,8 +151,8 @@ impl EventStream { let event_paths = event_paths as *const *const ::std::os::raw::c_char; let e_ptr = event_flags as *mut u32; let i_ptr = event_ids as *mut u64; - let callback_ptr = (info as *mut Option).as_mut().unwrap(); - let callback = if let Some(callback) = callback_ptr.as_mut() { + let state = (info as *mut State).as_mut().unwrap(); + let callback = if let Some(callback) = state.callback.as_mut() { callback } else { return; @@ -139,30 +161,83 @@ impl EventStream { let paths = slice::from_raw_parts(event_paths, num); let flags = slice::from_raw_parts_mut(e_ptr, num); let ids = slice::from_raw_parts_mut(i_ptr, num); + let mut stream_restarted = false; + + // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel + // or our code couldn't keep up with the sheer volume of file-system events that were + // generated. If we observed a valid event before this happens, we'll try to read the + // file-system journal by stopping the current stream and creating a new one starting at + // such event. Otherwise, we'll let invoke the callback with the dropped event, which + // will likely perform a re-scan of one of the root directories. + if flags + .iter() + .copied() + .filter_map(StreamFlags::from_bits) + .any(|flags| { + flags.contains(StreamFlags::USER_DROPPED) + || flags.contains(StreamFlags::KERNEL_DROPPED) + }) + { + if let Some(last_valid_event_id) = state.last_valid_event_id.take() { + fs::FSEventStreamStop(state.stream); + fs::FSEventStreamInvalidate(state.stream); + fs::FSEventStreamRelease(state.stream); + + let stream_context = fs::FSEventStreamContext { + version: 0, + info, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + state.paths, + last_valid_event_id, + state.latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + + state.stream = stream; + fs::FSEventStreamScheduleWithRunLoop( + state.stream, + cf::CFRunLoopGetCurrent(), + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(state.stream); + stream_restarted = true; + } + } - let mut events = Vec::with_capacity(num); - for p in 0..num { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if flag.contains(StreamFlags::HISTORY_DONE) { - events.clear(); + if !stream_restarted { + let mut events = Vec::with_capacity(num); + for p in 0..num { + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + if !flag.contains(StreamFlags::HISTORY_DONE) { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + let event = Event { + event_id: ids[p], + flags: flag, + path, + }; + state.last_valid_event_id = Some(event.event_id); + events.push(event); + } } else { - events.push(Event { - event_id: ids[p], - flags: flag, - path, - }); + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); } - } - if !events.is_empty() { - if !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + if !events.is_empty() { + if !callback(events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } } } }