Merge pull request #1033 from zed-industries/historical-events

Antonio Scandurra created

Restart FSEventStream at the last seen event when "dropped" is reported

Change summary

crates/fsevent/src/fsevent.rs | 167 ++++++++++++++++++++++++++----------
1 file changed, 121 insertions(+), 46 deletions(-)

Detailed changes

crates/fsevent/src/fsevent.rs 🔗

@@ -8,7 +8,7 @@ use std::{
     ffi::{c_void, CStr, OsStr},
     os::unix::ffi::OsStrExt,
     path::{Path, PathBuf},
-    slice,
+    ptr, slice,
     sync::Arc,
     time::Duration,
 };
@@ -21,12 +21,28 @@ pub struct Event {
 }
 
 pub struct EventStream {
+    lifecycle: Arc<Mutex<Lifecycle>>,
+    state: Box<State>,
+}
+
+struct State {
+    latency: Duration,
+    paths: cf::CFMutableArrayRef,
+    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
+    last_valid_event_id: Option<fs::FSEventStreamEventId>,
     stream: fs::FSEventStreamRef,
-    state: Arc<Mutex<Lifecycle>>,
-    callback: Box<Option<RunCallback>>,
 }
 
-type RunCallback = Box<dyn FnMut(Vec<Event>) -> bool>;
+impl Drop for State {
+    fn drop(&mut self) {
+        unsafe {
+            cf::CFRelease(self.paths);
+            fs::FSEventStreamStop(self.stream);
+            fs::FSEventStreamInvalidate(self.stream);
+            fs::FSEventStreamRelease(self.stream);
+        }
+    }
+}
 
 enum Lifecycle {
     New,
@@ -42,15 +58,6 @@ unsafe impl Send for Lifecycle {}
 impl EventStream {
     pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
         unsafe {
-            let callback = Box::new(None);
-            let stream_context = fs::FSEventStreamContext {
-                version: 0,
-                info: callback.as_ref() as *const _ as *mut c_void,
-                retain: None,
-                release: None,
-                copy_description: None,
-            };
-
             let cf_paths =
                 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
             assert!(!cf_paths.is_null());
@@ -69,6 +76,20 @@ impl EventStream {
                 cf::CFRelease(cf_url);
             }
 
+            let mut state = Box::new(State {
+                latency,
+                paths: cf_paths,
+                callback: None,
+                last_valid_event_id: None,
+                stream: ptr::null_mut(),
+            });
+            let stream_context = fs::FSEventStreamContext {
+                version: 0,
+                info: state.as_ref() as *const _ as *mut c_void,
+                retain: None,
+                release: None,
+                copy_description: None,
+            };
             let stream = fs::FSEventStreamCreate(
                 cf::kCFAllocatorDefault,
                 Self::trampoline,
@@ -80,17 +101,15 @@ impl EventStream {
                     | fs::kFSEventStreamCreateFlagNoDefer
                     | fs::kFSEventStreamCreateFlagWatchRoot,
             );
-            cf::CFRelease(cf_paths);
-
-            let state = Arc::new(Mutex::new(Lifecycle::New));
+            state.stream = stream;
 
+            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
             (
                 EventStream {
-                    stream,
-                    state: state.clone(),
-                    callback,
+                    lifecycle: lifecycle.clone(),
+                    state,
                 },
-                Handle(state),
+                Handle(lifecycle),
             )
         }
     }
@@ -99,21 +118,24 @@ impl EventStream {
     where
         F: FnMut(Vec<Event>) -> bool + 'static,
     {
-        *self.callback = Some(Box::new(f));
+        self.state.callback = Some(Box::new(f));
         unsafe {
             let run_loop = cf::CFRunLoopGetCurrent();
             {
-                let mut state = self.state.lock();
+                let mut state = self.lifecycle.lock();
                 match *state {
                     Lifecycle::New => *state = Lifecycle::Running(run_loop),
                     Lifecycle::Running(_) => unreachable!(),
                     Lifecycle::Stopped => return,
                 }
             }
-            fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
-            fs::FSEventStreamStart(self.stream);
+            fs::FSEventStreamScheduleWithRunLoop(
+                self.state.stream,
+                run_loop,
+                cf::kCFRunLoopDefaultMode,
+            );
+            fs::FSEventStreamStart(self.state.stream);
             cf::CFRunLoopRun();
-            fs::FSEventStreamRelease(self.stream);
         }
     }
 
@@ -129,8 +151,8 @@ impl EventStream {
             let event_paths = event_paths as *const *const ::std::os::raw::c_char;
             let e_ptr = event_flags as *mut u32;
             let i_ptr = event_ids as *mut u64;
-            let callback_ptr = (info as *mut Option<RunCallback>).as_mut().unwrap();
-            let callback = if let Some(callback) = callback_ptr.as_mut() {
+            let state = (info as *mut State).as_mut().unwrap();
+            let callback = if let Some(callback) = state.callback.as_mut() {
                 callback
             } else {
                 return;
@@ -139,30 +161,83 @@ impl EventStream {
             let paths = slice::from_raw_parts(event_paths, num);
             let flags = slice::from_raw_parts_mut(e_ptr, num);
             let ids = slice::from_raw_parts_mut(i_ptr, num);
+            let mut stream_restarted = false;
+
+            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
+            // or our code couldn't keep up with the sheer volume of file-system events that were
+            // generated. If we observed a valid event before this happens, we'll try to read the
+            // file-system journal by stopping the current stream and creating a new one starting at
+            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
+            // will likely perform a re-scan of one of the root directories.
+            if flags
+                .iter()
+                .copied()
+                .filter_map(StreamFlags::from_bits)
+                .any(|flags| {
+                    flags.contains(StreamFlags::USER_DROPPED)
+                        || flags.contains(StreamFlags::KERNEL_DROPPED)
+                })
+            {
+                if let Some(last_valid_event_id) = state.last_valid_event_id.take() {
+                    fs::FSEventStreamStop(state.stream);
+                    fs::FSEventStreamInvalidate(state.stream);
+                    fs::FSEventStreamRelease(state.stream);
+
+                    let stream_context = fs::FSEventStreamContext {
+                        version: 0,
+                        info,
+                        retain: None,
+                        release: None,
+                        copy_description: None,
+                    };
+                    let stream = fs::FSEventStreamCreate(
+                        cf::kCFAllocatorDefault,
+                        Self::trampoline,
+                        &stream_context,
+                        state.paths,
+                        last_valid_event_id,
+                        state.latency.as_secs_f64(),
+                        fs::kFSEventStreamCreateFlagFileEvents
+                            | fs::kFSEventStreamCreateFlagNoDefer
+                            | fs::kFSEventStreamCreateFlagWatchRoot,
+                    );
+
+                    state.stream = stream;
+                    fs::FSEventStreamScheduleWithRunLoop(
+                        state.stream,
+                        cf::CFRunLoopGetCurrent(),
+                        cf::kCFRunLoopDefaultMode,
+                    );
+                    fs::FSEventStreamStart(state.stream);
+                    stream_restarted = true;
+                }
+            }
 
-            let mut events = Vec::with_capacity(num);
-            for p in 0..num {
-                let path_c_str = CStr::from_ptr(paths[p]);
-                let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
-                if let Some(flag) = StreamFlags::from_bits(flags[p]) {
-                    if flag.contains(StreamFlags::HISTORY_DONE) {
-                        events.clear();
+            if !stream_restarted {
+                let mut events = Vec::with_capacity(num);
+                for p in 0..num {
+                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
+                        if !flag.contains(StreamFlags::HISTORY_DONE) {
+                            let path_c_str = CStr::from_ptr(paths[p]);
+                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
+                            let event = Event {
+                                event_id: ids[p],
+                                flags: flag,
+                                path,
+                            };
+                            state.last_valid_event_id = Some(event.event_id);
+                            events.push(event);
+                        }
                     } else {
-                        events.push(Event {
-                            event_id: ids[p],
-                            flags: flag,
-                            path,
-                        });
+                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
                     }
-                } else {
-                    debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
                 }
-            }
 
-            if !events.is_empty() {
-                if !callback(events) {
-                    fs::FSEventStreamStop(stream_ref);
-                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
+                if !events.is_empty() {
+                    if !callback(events) {
+                        fs::FSEventStreamStop(stream_ref);
+                        cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
+                    }
                 }
             }
         }