From f3bc4feaf04e235c74c0c53f3bcff64b7e197dbf Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 23 May 2022 09:06:09 +0200 Subject: [PATCH 1/3] Pass a richer `State` pointer to fsevents trampoline This will be useful to re-instate a new stream when dropping events. --- crates/fsevent/src/fsevent.rs | 59 ++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index 95171f835c7d4adef52dece7b740cdbfb9e0f4cd..efd498efed64ba6c03d6b4c4f38d65e739f68165 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -22,11 +22,24 @@ pub struct Event { pub struct EventStream { stream: fs::FSEventStreamRef, - state: Arc>, - callback: Box>, + lifecycle: Arc>, + state: Box, } -type RunCallback = Box) -> bool>; +struct State { + latency: Duration, + paths: cf::CFMutableArrayRef, + callback: Option) -> bool>>, + last_valid_event_id: Option, +} + +impl Drop for State { + fn drop(&mut self) { + unsafe { + cf::CFRelease(self.paths); + } + } +} enum Lifecycle { New, @@ -42,15 +55,6 @@ unsafe impl Send for Lifecycle {} impl EventStream { pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { unsafe { - let callback = Box::new(None); - let stream_context = fs::FSEventStreamContext { - version: 0, - info: callback.as_ref() as *const _ as *mut c_void, - retain: None, - release: None, - copy_description: None, - }; - let cf_paths = cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); assert!(!cf_paths.is_null()); @@ -69,6 +73,19 @@ impl EventStream { cf::CFRelease(cf_url); } + let state = Box::new(State { + latency, + paths: cf_paths, + callback: None, + last_valid_event_id: None, + }); + let stream_context = fs::FSEventStreamContext { + version: 0, + info: state.as_ref() as *const _ as *mut c_void, + retain: None, + release: None, + copy_description: None, + }; let stream = fs::FSEventStreamCreate( cf::kCFAllocatorDefault, Self::trampoline, @@ -80,17 +97,15 @@ impl EventStream { | fs::kFSEventStreamCreateFlagNoDefer | fs::kFSEventStreamCreateFlagWatchRoot, ); - cf::CFRelease(cf_paths); - - let state = Arc::new(Mutex::new(Lifecycle::New)); + let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); ( EventStream { stream, - state: state.clone(), - callback, + lifecycle: lifecycle.clone(), + state, }, - Handle(state), + Handle(lifecycle), ) } } @@ -99,11 +114,11 @@ impl EventStream { where F: FnMut(Vec) -> bool + 'static, { - *self.callback = Some(Box::new(f)); + self.state.callback = Some(Box::new(f)); unsafe { let run_loop = cf::CFRunLoopGetCurrent(); { - let mut state = self.state.lock(); + let mut state = self.lifecycle.lock(); match *state { Lifecycle::New => *state = Lifecycle::Running(run_loop), Lifecycle::Running(_) => unreachable!(), @@ -129,8 +144,8 @@ impl EventStream { let event_paths = event_paths as *const *const ::std::os::raw::c_char; let e_ptr = event_flags as *mut u32; let i_ptr = event_ids as *mut u64; - let callback_ptr = (info as *mut Option).as_mut().unwrap(); - let callback = if let Some(callback) = callback_ptr.as_mut() { + let state = (info as *mut State).as_mut().unwrap(); + let callback = if let Some(callback) = state.callback.as_mut() { callback } else { return; From 663173d2f54a60f322c6dd6cac3e79bf5969d1cc Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 23 May 2022 09:33:10 +0200 Subject: [PATCH 2/3] Restart FSEventStream at the last seen event when "dropped" is reported --- crates/fsevent/src/fsevent.rs | 112 ++++++++++++++++++++++++++-------- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index efd498efed64ba6c03d6b4c4f38d65e739f68165..4b05b6599c36d6218fe11ed428a05b4c252a6ad0 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -8,7 +8,7 @@ use std::{ ffi::{c_void, CStr, OsStr}, os::unix::ffi::OsStrExt, path::{Path, PathBuf}, - slice, + ptr, slice, sync::Arc, time::Duration, }; @@ -21,7 +21,6 @@ pub struct Event { } pub struct EventStream { - stream: fs::FSEventStreamRef, lifecycle: Arc>, state: Box, } @@ -31,6 +30,7 @@ struct State { paths: cf::CFMutableArrayRef, callback: Option) -> bool>>, last_valid_event_id: Option, + stream: fs::FSEventStreamRef, } impl Drop for State { @@ -73,11 +73,12 @@ impl EventStream { cf::CFRelease(cf_url); } - let state = Box::new(State { + let mut state = Box::new(State { latency, paths: cf_paths, callback: None, last_valid_event_id: None, + stream: ptr::null_mut(), }); let stream_context = fs::FSEventStreamContext { version: 0, @@ -97,11 +98,11 @@ impl EventStream { | fs::kFSEventStreamCreateFlagNoDefer | fs::kFSEventStreamCreateFlagWatchRoot, ); + state.stream = stream; let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); ( EventStream { - stream, lifecycle: lifecycle.clone(), state, }, @@ -125,10 +126,16 @@ impl EventStream { Lifecycle::Stopped => return, } } - fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode); - fs::FSEventStreamStart(self.stream); + fs::FSEventStreamScheduleWithRunLoop( + self.state.stream, + run_loop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(self.state.stream); cf::CFRunLoopRun(); - fs::FSEventStreamRelease(self.stream); + fs::FSEventStreamStop(self.state.stream); + fs::FSEventStreamInvalidate(self.state.stream); + fs::FSEventStreamRelease(self.state.stream); } } @@ -154,30 +161,83 @@ impl EventStream { let paths = slice::from_raw_parts(event_paths, num); let flags = slice::from_raw_parts_mut(e_ptr, num); let ids = slice::from_raw_parts_mut(i_ptr, num); + let mut stream_restarted = false; + + // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel + // or our code couldn't keep up with the sheer volume of file-system events that were + // generated. If we observed a valid event before this happens, we'll try to read the + // file-system journal by stopping the current stream and creating a new one starting at + // such event. Otherwise, we'll let invoke the callback with the dropped event, which + // will likely perform a re-scan of one of the root directories. + if flags + .iter() + .copied() + .filter_map(StreamFlags::from_bits) + .any(|flags| { + flags.contains(StreamFlags::USER_DROPPED) + || flags.contains(StreamFlags::KERNEL_DROPPED) + }) + { + if let Some(last_valid_event_id) = state.last_valid_event_id.take() { + fs::FSEventStreamStop(state.stream); + fs::FSEventStreamInvalidate(state.stream); + fs::FSEventStreamRelease(state.stream); + + let stream_context = fs::FSEventStreamContext { + version: 0, + info, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + state.paths, + last_valid_event_id, + state.latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + + state.stream = stream; + fs::FSEventStreamScheduleWithRunLoop( + state.stream, + cf::CFRunLoopGetCurrent(), + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(state.stream); + stream_restarted = true; + } + } - let mut events = Vec::with_capacity(num); - for p in 0..num { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if flag.contains(StreamFlags::HISTORY_DONE) { - events.clear(); + if !stream_restarted { + let mut events = Vec::with_capacity(num); + for p in 0..num { + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + if !flag.contains(StreamFlags::HISTORY_DONE) { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + let event = Event { + event_id: ids[p], + flags: flag, + path, + }; + state.last_valid_event_id = Some(event.event_id); + events.push(event); + } } else { - events.push(Event { - event_id: ids[p], - flags: flag, - path, - }); + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); } - } - if !events.is_empty() { - if !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + if !events.is_empty() { + if !callback(events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } } } } From ce46efff2eb4fc10b76b527c5d04685e683130b9 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 23 May 2022 09:48:00 +0200 Subject: [PATCH 3/3] :art: --- crates/fsevent/src/fsevent.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index 4b05b6599c36d6218fe11ed428a05b4c252a6ad0..4b4d3766bd60f77e1c800eccdbb9fbe5aeb8de6b 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -37,6 +37,9 @@ impl Drop for State { fn drop(&mut self) { unsafe { cf::CFRelease(self.paths); + fs::FSEventStreamStop(self.stream); + fs::FSEventStreamInvalidate(self.stream); + fs::FSEventStreamRelease(self.stream); } } } @@ -133,9 +136,6 @@ impl EventStream { ); fs::FSEventStreamStart(self.state.stream); cf::CFRunLoopRun(); - fs::FSEventStreamStop(self.state.stream); - fs::FSEventStreamInvalidate(self.state.stream); - fs::FSEventStreamRelease(self.state.stream); } }