Flush redundant fs events in worktree test

Max Brunsfeld and Antonio Scandurra created

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

fsevent/src/lib.rs  | 169 ++++++++++++++++++++++++++++------------------
gpui/src/app.rs     |  15 +++
zed/src/worktree.rs |  32 ++++++++
3 files changed, 145 insertions(+), 71 deletions(-)

Detailed changes

fsevent/src/lib.rs 🔗

@@ -82,15 +82,6 @@ impl EventStream {
             );
             cf::CFRelease(cf_paths);
 
-            fs::FSEventStreamScheduleWithRunLoop(
-                stream,
-                cf::CFRunLoopGetCurrent(),
-                cf::kCFRunLoopDefaultMode,
-            );
-            fs::FSEventStreamStart(stream);
-            fs::FSEventStreamFlushSync(stream);
-            fs::FSEventStreamStop(stream);
-
             let state = Arc::new(Mutex::new(Lifecycle::New));
 
             (
@@ -302,70 +293,118 @@ extern "C" {
     pub fn FSEventsGetCurrentEventId() -> u64;
 }
 
-#[test]
-fn test_event_stream() {
-    use std::{fs, sync::mpsc, time::Duration};
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::{fs, sync::mpsc, thread, time::Duration};
     use tempdir::TempDir;
 
-    let dir = TempDir::new("test_observe").unwrap();
-    let path = dir.path().canonicalize().unwrap();
-    fs::write(path.join("a"), "a contents").unwrap();
-
-    let (tx, rx) = mpsc::channel();
-    let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-    std::thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
-
-    fs::write(path.join("b"), "b contents").unwrap();
-    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
-    let event = events.last().unwrap();
-    assert_eq!(event.path, path.join("b"));
-    assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
-
-    fs::remove_file(path.join("a")).unwrap();
-    let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
-    let event = events.last().unwrap();
-    assert_eq!(event.path, path.join("a"));
-    assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
-    drop(handle);
-}
+    #[test]
+    fn test_event_stream_simple() {
+        for _ in 0..3 {
+            let dir = TempDir::new("test-event-stream").unwrap();
+            let path = dir.path().canonicalize().unwrap();
+            for i in 0..10 {
+                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
+            }
 
-#[test]
-fn test_event_stream_shutdown() {
-    use std::{fs, sync::mpsc, time::Duration};
-    use tempdir::TempDir;
+            let (tx, rx) = mpsc::channel();
+            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
+
+            // Flush any historical events.
+            rx.recv_timeout(Duration::from_millis(500)).ok();
+
+            fs::write(path.join("new-file"), "").unwrap();
+            let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("new-file"));
+            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
+
+            fs::remove_file(path.join("existing-file-5")).unwrap();
+            let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("existing-file-5"));
+            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
+            drop(handle);
+        }
+    }
 
-    let dir = TempDir::new("test_observe").unwrap();
-    let path = dir.path().canonicalize().unwrap();
-
-    let (tx, rx) = mpsc::channel();
-    let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-    std::thread::spawn(move || {
-        stream.run({
-            let tx = tx.clone();
-            move |_| {
-                tx.send(()).unwrap();
-                true
+    #[test]
+    fn test_event_stream_delayed_start() {
+        for _ in 0..3 {
+            let dir = TempDir::new("test-event-stream").unwrap();
+            let path = dir.path().canonicalize().unwrap();
+            for i in 0..10 {
+                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
             }
-        });
-        tx.send(()).unwrap();
-    });
 
-    fs::write(path.join("b"), "b contents").unwrap();
-    rx.recv_timeout(Duration::from_millis(500)).unwrap();
+            let (tx, rx) = mpsc::channel();
+            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+
+            // Delay the call to `run` in order to make sure we don't miss any events that occur
+            // between creating the `EventStream` and calling `run`.
+            thread::spawn(move || {
+                thread::sleep(Duration::from_millis(250));
+                stream.run(move |events| tx.send(events.to_vec()).is_ok())
+            });
+
+            fs::write(path.join("new-file"), "").unwrap();
+            let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("new-file"));
+            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
+
+            fs::remove_file(path.join("existing-file-5")).unwrap();
+            let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("existing-file-5"));
+            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
+            drop(handle);
+        }
+    }
 
-    drop(handle);
-    rx.recv_timeout(Duration::from_millis(500)).unwrap();
-}
+    #[test]
+    fn test_event_stream_shutdown_by_dropping_handle() {
+        let dir = TempDir::new("test-event-stream").unwrap();
+        let path = dir.path().canonicalize().unwrap();
+
+        let (tx, rx) = mpsc::channel();
+        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+        thread::spawn(move || {
+            stream.run({
+                let tx = tx.clone();
+                move |_| {
+                    tx.send("running").unwrap();
+                    true
+                }
+            });
+            tx.send("stopped").unwrap();
+        });
 
-#[test]
-fn test_event_stream_shutdown_before_run() {
-    use std::time::Duration;
-    use tempdir::TempDir;
+        fs::write(path.join("new-file"), "").unwrap();
+        assert_eq!(
+            rx.recv_timeout(Duration::from_millis(500)).unwrap(),
+            "running"
+        );
+
+        // Dropping the handle causes `EventStream::run` to return.
+        drop(handle);
+        assert_eq!(
+            rx.recv_timeout(Duration::from_millis(500)).unwrap(),
+            "stopped"
+        );
+    }
 
-    let dir = TempDir::new("test_observe").unwrap();
-    let path = dir.path().canonicalize().unwrap();
+    #[test]
+    fn test_event_stream_shutdown_before_run() {
+        let dir = TempDir::new("test-event-stream").unwrap();
+        let path = dir.path().canonicalize().unwrap();
 
-    let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-    drop(handle);
-    stream.run(|_| true);
+        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+        drop(handle);
+
+        // This returns immediately because the handle was already dropped.
+        stream.run(|_| true);
+    }
 }

gpui/src/app.rs 🔗

@@ -2003,8 +2003,17 @@ impl<T: Entity> ModelHandle<T> {
     pub fn condition(
         &self,
         ctx: &TestAppContext,
-        mut predicate: impl 'static + FnMut(&T, &AppContext) -> bool,
-    ) -> impl 'static + Future<Output = ()> {
+        predicate: impl FnMut(&T, &AppContext) -> bool,
+    ) -> impl Future<Output = ()> {
+        self.condition_with_duration(Duration::from_millis(100), ctx, predicate)
+    }
+
+    pub fn condition_with_duration(
+        &self,
+        duration: Duration,
+        ctx: &TestAppContext,
+        mut predicate: impl FnMut(&T, &AppContext) -> bool,
+    ) -> impl Future<Output = ()> {
         let mut ctx = ctx.0.borrow_mut();
         let tx = ctx
             .async_observations
@@ -2015,7 +2024,7 @@ impl<T: Entity> ModelHandle<T> {
         let handle = self.downgrade();
 
         async move {
-            timeout(Duration::from_millis(200), async move {
+            timeout(duration, async move {
                 loop {
                     {
                         let ctx = ctx.borrow();

zed/src/worktree.rs 🔗

@@ -1235,7 +1235,7 @@ mod tests {
     use crate::editor::Buffer;
     use crate::test::*;
     use anyhow::Result;
-    use gpui::App;
+    use gpui::{App, TestAppContext};
     use rand::prelude::*;
     use serde_json::json;
     use std::env;
@@ -1345,7 +1345,7 @@ mod tests {
 
             let tree = app.add_model(|ctx| Worktree::new(dir.path(), ctx));
             app.read(|ctx| tree.read(ctx).scan_complete()).await;
-            app.read(|ctx| assert_eq!(tree.read(ctx).file_count(), 5));
+            flush_fs_events(&tree, &app).await;
 
             let (file2, file3, file4, file5) = app.read(|ctx| {
                 (
@@ -1358,8 +1358,8 @@ mod tests {
 
             std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
             std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
-            std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
             std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
+            std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
             app.read(|ctx| tree.read(ctx).next_scan_complete()).await;
 
             app.read(|ctx| {
@@ -1411,6 +1411,7 @@ mod tests {
 
             let tree = app.add_model(|ctx| Worktree::new(dir.path(), ctx));
             app.read(|ctx| tree.read(ctx).scan_complete()).await;
+            flush_fs_events(&tree, &app).await;
             app.read(|ctx| {
                 let tree = tree.read(ctx);
                 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
@@ -1717,4 +1718,29 @@ mod tests {
             paths
         }
     }
+
+    // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
+    // occurred before the worktree was constructed. These events can cause the worktree to perfrom
+    // extra directory scans, and emit extra scan-state notifications.
+    //
+    // This function mutates the worktree's directory and waits for those mutations to be picked up,
+    // to ensure that all redundant FS events have already been processed.
+    async fn flush_fs_events(tree: &ModelHandle<Worktree>, app: &TestAppContext) {
+        let filename = "fs-event-sentinel";
+        let root_path = app.read(|ctx| tree.read(ctx).abs_path.clone());
+
+        fs::write(root_path.join(filename), "").unwrap();
+        tree.condition_with_duration(Duration::from_secs(5), &app, |tree, _| {
+            tree.entry_for_path(filename).is_some()
+        })
+        .await;
+
+        fs::remove_file(root_path.join(filename)).unwrap();
+        tree.condition_with_duration(Duration::from_secs(5), &app, |tree, _| {
+            tree.entry_for_path(filename).is_none()
+        })
+        .await;
+
+        app.read(|ctx| tree.read(ctx).scan_complete()).await;
+    }
 }