Restart FSEventStream at the last seen event when "dropped" is reported

Antonio Scandurra created

Change summary

crates/fsevent/src/fsevent.rs | 112 ++++++++++++++++++++++++++++--------
1 file changed, 86 insertions(+), 26 deletions(-)

Detailed changes

crates/fsevent/src/fsevent.rs 🔗

@@ -8,7 +8,7 @@ use std::{
     ffi::{c_void, CStr, OsStr},
     os::unix::ffi::OsStrExt,
     path::{Path, PathBuf},
-    slice,
+    ptr, slice,
     sync::Arc,
     time::Duration,
 };
@@ -21,7 +21,6 @@ pub struct Event {
 }
 
 pub struct EventStream {
-    stream: fs::FSEventStreamRef,
     lifecycle: Arc<Mutex<Lifecycle>>,
     state: Box<State>,
 }
@@ -31,6 +30,7 @@ struct State {
     paths: cf::CFMutableArrayRef,
     callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
     last_valid_event_id: Option<fs::FSEventStreamEventId>,
+    stream: fs::FSEventStreamRef,
 }
 
 impl Drop for State {
@@ -73,11 +73,12 @@ impl EventStream {
                 cf::CFRelease(cf_url);
             }
 
-            let state = Box::new(State {
+            let mut state = Box::new(State {
                 latency,
                 paths: cf_paths,
                 callback: None,
                 last_valid_event_id: None,
+                stream: ptr::null_mut(),
             });
             let stream_context = fs::FSEventStreamContext {
                 version: 0,
@@ -97,11 +98,11 @@ impl EventStream {
                     | fs::kFSEventStreamCreateFlagNoDefer
                     | fs::kFSEventStreamCreateFlagWatchRoot,
             );
+            state.stream = stream;
 
             let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
             (
                 EventStream {
-                    stream,
                     lifecycle: lifecycle.clone(),
                     state,
                 },
@@ -125,10 +126,16 @@ impl EventStream {
                     Lifecycle::Stopped => return,
                 }
             }
-            fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
-            fs::FSEventStreamStart(self.stream);
+            fs::FSEventStreamScheduleWithRunLoop(
+                self.state.stream,
+                run_loop,
+                cf::kCFRunLoopDefaultMode,
+            );
+            fs::FSEventStreamStart(self.state.stream);
             cf::CFRunLoopRun();
-            fs::FSEventStreamRelease(self.stream);
+            fs::FSEventStreamStop(self.state.stream);
+            fs::FSEventStreamInvalidate(self.state.stream);
+            fs::FSEventStreamRelease(self.state.stream);
         }
     }
 
@@ -154,30 +161,83 @@ impl EventStream {
             let paths = slice::from_raw_parts(event_paths, num);
             let flags = slice::from_raw_parts_mut(e_ptr, num);
             let ids = slice::from_raw_parts_mut(i_ptr, num);
+            let mut stream_restarted = false;
+
+            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
+            // or our code couldn't keep up with the sheer volume of file-system events that were
+            // generated. If we observed a valid event before this happens, we'll try to read the
+            // file-system journal by stopping the current stream and creating a new one starting at
+            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
+            // will likely perform a re-scan of one of the root directories.
+            if flags
+                .iter()
+                .copied()
+                .filter_map(StreamFlags::from_bits)
+                .any(|flags| {
+                    flags.contains(StreamFlags::USER_DROPPED)
+                        || flags.contains(StreamFlags::KERNEL_DROPPED)
+                })
+            {
+                if let Some(last_valid_event_id) = state.last_valid_event_id.take() {
+                    fs::FSEventStreamStop(state.stream);
+                    fs::FSEventStreamInvalidate(state.stream);
+                    fs::FSEventStreamRelease(state.stream);
+
+                    let stream_context = fs::FSEventStreamContext {
+                        version: 0,
+                        info,
+                        retain: None,
+                        release: None,
+                        copy_description: None,
+                    };
+                    let stream = fs::FSEventStreamCreate(
+                        cf::kCFAllocatorDefault,
+                        Self::trampoline,
+                        &stream_context,
+                        state.paths,
+                        last_valid_event_id,
+                        state.latency.as_secs_f64(),
+                        fs::kFSEventStreamCreateFlagFileEvents
+                            | fs::kFSEventStreamCreateFlagNoDefer
+                            | fs::kFSEventStreamCreateFlagWatchRoot,
+                    );
+
+                    state.stream = stream;
+                    fs::FSEventStreamScheduleWithRunLoop(
+                        state.stream,
+                        cf::CFRunLoopGetCurrent(),
+                        cf::kCFRunLoopDefaultMode,
+                    );
+                    fs::FSEventStreamStart(state.stream);
+                    stream_restarted = true;
+                }
+            }
 
-            let mut events = Vec::with_capacity(num);
-            for p in 0..num {
-                let path_c_str = CStr::from_ptr(paths[p]);
-                let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
-                if let Some(flag) = StreamFlags::from_bits(flags[p]) {
-                    if flag.contains(StreamFlags::HISTORY_DONE) {
-                        events.clear();
+            if !stream_restarted {
+                let mut events = Vec::with_capacity(num);
+                for p in 0..num {
+                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
+                        if !flag.contains(StreamFlags::HISTORY_DONE) {
+                            let path_c_str = CStr::from_ptr(paths[p]);
+                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
+                            let event = Event {
+                                event_id: ids[p],
+                                flags: flag,
+                                path,
+                            };
+                            state.last_valid_event_id = Some(event.event_id);
+                            events.push(event);
+                        }
                     } else {
-                        events.push(Event {
-                            event_id: ids[p],
-                            flags: flag,
-                            path,
-                        });
+                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
                     }
-                } else {
-                    debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
                 }
-            }
 
-            if !events.is_empty() {
-                if !callback(events) {
-                    fs::FSEventStreamStop(stream_ref);
-                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
+                if !events.is_empty() {
+                    if !callback(events) {
+                        fs::FSEventStreamStop(stream_ref);
+                        cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
+                    }
                 }
             }
         }