Revert "fs: Replace MacWatcher with notify::FsEventWatcher" (#47799)

Anthony Eid created

Reverts zed-industries/zed#47322. I'm going to remerge this tomorrow
after releases so we get a full week in nightly to catch any bugs

Change summary

Cargo.lock                        |  26 +
Cargo.toml                        |   3 
crates/fs/Cargo.toml              |   5 
crates/fs/src/fs.rs               |  60 +++
crates/fs/src/fs_watcher.rs       |  25 -
crates/fs/src/mac_watcher.rs      |  77 ++++
crates/fsevent/Cargo.toml         |  28 +
crates/fsevent/LICENSE-GPL        |   1 
crates/fsevent/examples/events.rs |  23 +
crates/fsevent/src/fsevent.rs     | 515 +++++++++++++++++++++++++++++++++
10 files changed, 739 insertions(+), 24 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -6518,6 +6518,7 @@ dependencies = [
  "cocoa 0.26.0",
  "collections",
  "fs",
+ "fsevent",
  "futures 0.3.31",
  "git",
  "gpui",
@@ -6576,6 +6577,27 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
 
+[[package]]
+name = "fsevent"
+version = "0.1.0"
+dependencies = [
+ "bitflags 2.9.4",
+ "core-foundation 0.10.0",
+ "fsevent-sys 3.1.0",
+ "log",
+ "parking_lot",
+ "tempfile",
+]
+
+[[package]]
+name = "fsevent-sys"
+version = "3.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca6f5e6817058771c10f0eb0f05ddf1e35844266f972004fe8e4b21fda295bd5"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "fsevent-sys"
 version = "4.1.0"
@@ -10570,7 +10592,7 @@ dependencies = [
  "bitflags 2.9.4",
  "crossbeam-channel",
  "filetime",
- "fsevent-sys",
+ "fsevent-sys 4.1.0",
  "inotify 0.9.6",
  "kqueue",
  "libc",
@@ -10586,7 +10608,7 @@ version = "8.2.0"
 source = "git+https://github.com/zed-industries/notify.git?rev=6c550ac3c56cbd143c57ea6390e197af9d790908#6c550ac3c56cbd143c57ea6390e197af9d790908"
 dependencies = [
  "bitflags 2.9.4",
- "fsevent-sys",
+ "fsevent-sys 4.1.0",
  "inotify 0.11.0",
  "kqueue",
  "libc",

Cargo.toml 🔗

@@ -76,6 +76,7 @@ members = [
     "crates/file_icons",
     "crates/fs",
     "crates/fs_benchmarks",
+    "crates/fsevent",
     "crates/fuzzy",
     "crates/git",
     "crates/git_graph",
@@ -312,6 +313,7 @@ feedback = { path = "crates/feedback" }
 file_finder = { path = "crates/file_finder" }
 file_icons = { path = "crates/file_icons" }
 fs = { path = "crates/fs" }
+fsevent = { path = "crates/fsevent" }
 fuzzy = { path = "crates/fuzzy" }
 git = { path = "crates/git" }
 git_graph = { path = "crates/git_graph" }
@@ -832,6 +834,7 @@ command_palette = { codegen-units = 1 }
 command_palette_hooks = { codegen-units = 1 }
 feature_flags = { codegen-units = 1 }
 file_icons = { codegen-units = 1 }
+fsevent = { codegen-units = 1 }
 image_viewer = { codegen-units = 1 }
 edit_prediction_ui = { codegen-units = 1 }
 install_cli = { codegen-units = 1 }

crates/fs/Cargo.toml 🔗

@@ -40,12 +40,15 @@ text.workspace = true
 time.workspace = true
 util.workspace = true
 is_executable = "1.0.5"
-notify = "8.2.0"
 
 [target.'cfg(target_os = "macos")'.dependencies]
+fsevent.workspace = true
 objc.workspace = true
 cocoa = "0.26"
 
+[target.'cfg(not(target_os = "macos"))'.dependencies]
+notify = "8.2.0"
+
 [target.'cfg(target_os = "windows")'.dependencies]
 windows.workspace = true
 

crates/fs/src/fs.rs 🔗

@@ -1,3 +1,7 @@
+#[cfg(target_os = "macos")]
+mod mac_watcher;
+
+#[cfg(not(target_os = "macos"))]
 pub mod fs_watcher;
 
 use parking_lot::Mutex;
@@ -972,6 +976,62 @@ impl Fs for RealFs {
         Ok(Box::pin(result))
     }
 
+    #[cfg(target_os = "macos")]
+    async fn watch(
+        &self,
+        path: &Path,
+        latency: Duration,
+    ) -> (
+        Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
+        Arc<dyn Watcher>,
+    ) {
+        use fsevent::StreamFlags;
+
+        let (events_tx, events_rx) = smol::channel::unbounded();
+        let handles = Arc::new(parking_lot::Mutex::new(collections::BTreeMap::default()));
+        let watcher = Arc::new(mac_watcher::MacWatcher::new(
+            events_tx,
+            Arc::downgrade(&handles),
+            latency,
+        ));
+        watcher.add(path).expect("handles can't be dropped");
+
+        (
+            Box::pin(
+                events_rx
+                    .map(|events| {
+                        events
+                            .into_iter()
+                            .map(|event| {
+                                log::trace!("fs path event: {event:?}");
+                                let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) {
+                                    Some(PathEventKind::Removed)
+                                } else if event.flags.contains(StreamFlags::ITEM_CREATED) {
+                                    Some(PathEventKind::Created)
+                                } else if event.flags.contains(StreamFlags::ITEM_MODIFIED)
+                                    | event.flags.contains(StreamFlags::ITEM_RENAMED)
+                                {
+                                    Some(PathEventKind::Changed)
+                                } else {
+                                    None
+                                };
+                                PathEvent {
+                                    path: event.path,
+                                    kind,
+                                }
+                            })
+                            .collect()
+                    })
+                    .chain(futures::stream::once(async move {
+                        drop(handles);
+                        vec![]
+                    })),
+            ),
+            watcher,
+        )
+    }
+
+    #[cfg(not(target_os = "macos"))]
     async fn watch(
         &self,
         path: &Path,

crates/fs/src/fs_watcher.rs 🔗

@@ -50,7 +50,7 @@ impl Watcher for FsWatcher {
         let tx = self.tx.clone();
         let pending_paths = self.pending_path_events.clone();
 
-        #[cfg(any(target_os = "windows", target_os = "macos"))]
+        #[cfg(target_os = "windows")]
         {
             // Return early if an ancestor of this path was already being watched.
             // saves a huge amount of memory
@@ -81,7 +81,7 @@ impl Watcher for FsWatcher {
         let root_path = SanitizedPath::new_arc(path);
         let path: Arc<std::path::Path> = path.into();
 
-        #[cfg(any(target_os = "windows", target_os = "macos"))]
+        #[cfg(target_os = "windows")]
         let mode = notify::RecursiveMode::Recursive;
         #[cfg(target_os = "linux")]
         let mode = notify::RecursiveMode::NonRecursive;
@@ -166,8 +166,6 @@ pub struct GlobalWatcher {
     watcher: Mutex<notify::KqueueWatcher>,
     #[cfg(target_os = "windows")]
     watcher: Mutex<notify::ReadDirectoryChangesWatcher>,
-    #[cfg(target_os = "macos")]
-    watcher: Mutex<notify::FsEventWatcher>,
 }
 
 impl GlobalWatcher {
@@ -180,24 +178,9 @@ impl GlobalWatcher {
     ) -> anyhow::Result<WatcherRegistrationId> {
         use notify::Watcher;
 
-        let mut state = self.state.lock();
-
-        // Check if this path is already covered by an existing watched ancestor path.
-        // On macOS and Windows, watching is recursive, so we don't need to watch
-        // child paths if an ancestor is already being watched.
-        #[cfg(any(target_os = "windows", target_os = "macos"))]
-        let path_already_covered = state.path_registrations.keys().any(|existing| {
-            path.starts_with(existing.as_ref()) && path.as_ref() != existing.as_ref()
-        });
-
-        #[cfg(not(any(target_os = "windows", target_os = "macos")))]
-        let path_already_covered = false;
+        self.watcher.lock().watch(&path, mode)?;
 
-        if !path_already_covered && !state.path_registrations.contains_key(&path) {
-            drop(state);
-            self.watcher.lock().watch(&path, mode)?;
-            state = self.state.lock();
-        }
+        let mut state = self.state.lock();
 
         let id = state.last_registration;
         state.last_registration = WatcherRegistrationId(id.0 + 1);

crates/fs/src/mac_watcher.rs 🔗

@@ -0,0 +1,77 @@
+use crate::Watcher;
+use anyhow::{Context as _, Result};
+use collections::{BTreeMap, Bound};
+use fsevent::EventStream;
+use parking_lot::Mutex;
+use std::{
+    path::{Path, PathBuf},
+    sync::Weak,
+    thread,
+    time::Duration,
+};
+
+pub struct MacWatcher {
+    events_tx: smol::channel::Sender<Vec<fsevent::Event>>,
+    handles: Weak<Mutex<BTreeMap<PathBuf, fsevent::Handle>>>,
+    latency: Duration,
+}
+
+impl MacWatcher {
+    pub fn new(
+        events_tx: smol::channel::Sender<Vec<fsevent::Event>>,
+        handles: Weak<Mutex<BTreeMap<PathBuf, fsevent::Handle>>>,
+        latency: Duration,
+    ) -> Self {
+        Self {
+            events_tx,
+            handles,
+            latency,
+        }
+    }
+}
+
+impl Watcher for MacWatcher {
+    fn add(&self, path: &Path) -> Result<()> {
+        log::trace!("mac watcher add: {:?}", path);
+        let handles = self
+            .handles
+            .upgrade()
+            .context("unable to watch path, receiver dropped")?;
+        let mut handles = handles.lock();
+
+        // Return early if an ancestor of this path was already being watched.
+        if let Some((watched_path, _)) = handles
+            .range::<Path, _>((Bound::Unbounded, Bound::Included(path)))
+            .next_back()
+            && path.starts_with(watched_path)
+        {
+            log::trace!(
+                "mac watched path starts with existing watched path: {watched_path:?}, {path:?}"
+            );
+            return Ok(());
+        }
+
+        let (stream, handle) = EventStream::new(&[path], self.latency);
+        let tx = self.events_tx.clone();
+        thread::Builder::new()
+            .name("MacWatcher".to_owned())
+            .spawn(move || {
+                stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
+            })
+            .unwrap();
+        handles.insert(path.into(), handle);
+
+        Ok(())
+    }
+
+    fn remove(&self, path: &Path) -> anyhow::Result<()> {
+        let handles = self
+            .handles
+            .upgrade()
+            .context("unable to remove path, receiver dropped")?;
+
+        let mut handles = handles.lock();
+        handles.remove(path);
+        Ok(())
+    }
+}

crates/fsevent/Cargo.toml 🔗

@@ -0,0 +1,28 @@
+[package]
+name = "fsevent"
+version = "0.1.0"
+edition.workspace = true
+publish.workspace = true
+license = "GPL-3.0-or-later"
+
+[lints]
+workspace = true
+
+[lib]
+path = "src/fsevent.rs"
+doctest = false
+
+[dependencies]
+bitflags.workspace = true
+parking_lot.workspace = true
+log.workspace = true
+
+[target.'cfg(target_os = "macos")'.dependencies]
+core-foundation.workspace = true
+fsevent-sys = "3.0.2"
+
+[dev-dependencies]
+tempfile.workspace = true
+
+[package.metadata.docs.rs]
+targets = ["x86_64-apple-darwin"]

crates/fsevent/examples/events.rs 🔗

@@ -0,0 +1,23 @@
+#[cfg(target_os = "macos")]
+fn main() {
+    use fsevent::EventStream;
+    use std::{env::args, path::Path, time::Duration};
+
+    let paths = args().skip(1).collect::<Vec<_>>();
+    let paths = paths.iter().map(Path::new).collect::<Vec<_>>();
+    assert!(!paths.is_empty(), "Must pass 1 or more paths as arguments");
+
+    let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100));
+    stream.run(|events| {
+        eprintln!("event batch");
+        for event in events {
+            eprintln!("  {:?}", event);
+        }
+        true
+    });
+}
+
+#[cfg(not(target_os = "macos"))]
+fn main() {
+    eprintln!("This example only works on macOS");
+}

crates/fsevent/src/fsevent.rs 🔗

@@ -0,0 +1,515 @@
+#![cfg(target_os = "macos")]
+
+use bitflags::bitflags;
+use fsevent_sys::{self as fs, core_foundation as cf};
+use parking_lot::Mutex;
+use std::{
+    convert::AsRef,
+    ffi::{CStr, OsStr, c_void},
+    os::unix::ffi::OsStrExt,
+    path::{Path, PathBuf},
+    ptr, slice,
+    sync::Arc,
+    time::Duration,
+};
+
+#[derive(Clone, Debug)]
+pub struct Event {
+    pub event_id: u64,
+    pub flags: StreamFlags,
+    pub path: PathBuf,
+}
+
+pub struct EventStream {
+    lifecycle: Arc<Mutex<Lifecycle>>,
+    state: Box<State>,
+}
+
+struct State {
+    latency: Duration,
+    paths: cf::CFMutableArrayRef,
+    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
+    last_valid_event_id: Option<fs::FSEventStreamEventId>,
+    stream: fs::FSEventStreamRef,
+}
+
+impl Drop for State {
+    fn drop(&mut self) {
+        unsafe {
+            cf::CFRelease(self.paths);
+            fs::FSEventStreamStop(self.stream);
+            fs::FSEventStreamInvalidate(self.stream);
+            fs::FSEventStreamRelease(self.stream);
+        }
+    }
+}
+
+enum Lifecycle {
+    New,
+    Running(cf::CFRunLoopRef),
+    Stopped,
+}
+
+pub struct Handle(Arc<Mutex<Lifecycle>>);
+
+unsafe impl Send for EventStream {}
+unsafe impl Send for Lifecycle {}
+
+impl EventStream {
+    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
+        unsafe {
+            let cf_paths =
+                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
+            assert!(!cf_paths.is_null());
+
+            for path in paths {
+                let path_bytes = path.as_os_str().as_bytes();
+                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
+                    cf::kCFAllocatorDefault,
+                    path_bytes.as_ptr() as *const i8,
+                    path_bytes.len() as cf::CFIndex,
+                    false,
+                );
+                if !cf_url.is_null() {
+                    let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
+                    cf::CFArrayAppendValue(cf_paths, cf_path);
+                    cf::CFRelease(cf_path);
+                    cf::CFRelease(cf_url);
+                } else {
+                    log::error!("Failed to create CFURL for path: {path:?}");
+                }
+            }
+
+            let mut state = Box::new(State {
+                latency,
+                paths: cf_paths,
+                callback: None,
+                last_valid_event_id: None,
+                stream: ptr::null_mut(),
+            });
+            let stream_context = fs::FSEventStreamContext {
+                version: 0,
+                info: state.as_ref() as *const _ as *mut c_void,
+                retain: None,
+                release: None,
+                copy_description: None,
+            };
+            let stream = fs::FSEventStreamCreate(
+                cf::kCFAllocatorDefault,
+                Self::trampoline,
+                &stream_context,
+                cf_paths,
+                FSEventsGetCurrentEventId(),
+                latency.as_secs_f64(),
+                fs::kFSEventStreamCreateFlagFileEvents
+                    | fs::kFSEventStreamCreateFlagNoDefer
+                    | fs::kFSEventStreamCreateFlagWatchRoot,
+            );
+            state.stream = stream;
+
+            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
+            (
+                EventStream {
+                    lifecycle: lifecycle.clone(),
+                    state,
+                },
+                Handle(lifecycle),
+            )
+        }
+    }
+
+    pub fn run<F>(mut self, f: F)
+    where
+        F: FnMut(Vec<Event>) -> bool + 'static,
+    {
+        self.state.callback = Some(Box::new(f));
+        unsafe {
+            let run_loop =
+                core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
+            {
+                let mut state = self.lifecycle.lock();
+                match *state {
+                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
+                    Lifecycle::Running(_) => unreachable!(),
+                    Lifecycle::Stopped => return,
+                }
+            }
+            fs::FSEventStreamScheduleWithRunLoop(
+                self.state.stream,
+                run_loop,
+                cf::kCFRunLoopDefaultMode,
+            );
+            fs::FSEventStreamStart(self.state.stream);
+            cf::CFRunLoopRun();
+        }
+    }
+
+    extern "C" fn trampoline(
+        stream_ref: fs::FSEventStreamRef,
+        info: *mut ::std::os::raw::c_void,
+        num: usize,                                 // size_t numEvents
+        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
+        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
+        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
+    ) {
+        unsafe {
+            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
+            let e_ptr = event_flags as *mut u32;
+            let i_ptr = event_ids as *mut u64;
+            let state = (info as *mut State).as_mut().unwrap();
+            let callback = if let Some(callback) = state.callback.as_mut() {
+                callback
+            } else {
+                return;
+            };
+
+            let paths = slice::from_raw_parts(event_paths, num);
+            let flags = slice::from_raw_parts_mut(e_ptr, num);
+            let ids = slice::from_raw_parts_mut(i_ptr, num);
+            let mut stream_restarted = false;
+
+            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
+            // or our code couldn't keep up with the sheer volume of file-system events that were
+            // generated. If we observed a valid event before this happens, we'll try to read the
+            // file-system journal by stopping the current stream and creating a new one starting at
+            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
+            // will likely perform a re-scan of one of the root directories.
+            if flags
+                .iter()
+                .copied()
+                .filter_map(StreamFlags::from_bits)
+                .any(|flags| {
+                    flags.contains(StreamFlags::USER_DROPPED)
+                        || flags.contains(StreamFlags::KERNEL_DROPPED)
+                })
+                && let Some(last_valid_event_id) = state.last_valid_event_id.take()
+            {
+                fs::FSEventStreamStop(state.stream);
+                fs::FSEventStreamInvalidate(state.stream);
+                fs::FSEventStreamRelease(state.stream);
+
+                let stream_context = fs::FSEventStreamContext {
+                    version: 0,
+                    info,
+                    retain: None,
+                    release: None,
+                    copy_description: None,
+                };
+                let stream = fs::FSEventStreamCreate(
+                    cf::kCFAllocatorDefault,
+                    Self::trampoline,
+                    &stream_context,
+                    state.paths,
+                    last_valid_event_id,
+                    state.latency.as_secs_f64(),
+                    fs::kFSEventStreamCreateFlagFileEvents
+                        | fs::kFSEventStreamCreateFlagNoDefer
+                        | fs::kFSEventStreamCreateFlagWatchRoot,
+                );
+
+                state.stream = stream;
+                fs::FSEventStreamScheduleWithRunLoop(
+                    state.stream,
+                    cf::CFRunLoopGetCurrent(),
+                    cf::kCFRunLoopDefaultMode,
+                );
+                fs::FSEventStreamStart(state.stream);
+                stream_restarted = true;
+            }
+
+            if !stream_restarted {
+                let mut events = Vec::with_capacity(num);
+                for p in 0..num {
+                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
+                        if !flag.contains(StreamFlags::HISTORY_DONE) {
+                            let path_c_str = CStr::from_ptr(paths[p]);
+                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
+                            let event = Event {
+                                event_id: ids[p],
+                                flags: flag,
+                                path,
+                            };
+                            state.last_valid_event_id = Some(event.event_id);
+                            events.push(event);
+                        }
+                    } else {
+                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
+                    }
+                }
+
+                if !events.is_empty() && !callback(events) {
+                    fs::FSEventStreamStop(stream_ref);
+                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
+                }
+            }
+        }
+    }
+}
+
+impl Drop for Handle {
+    fn drop(&mut self) {
+        let mut state = self.0.lock();
+        if let Lifecycle::Running(run_loop) = *state {
+            unsafe {
+                cf::CFRunLoopStop(run_loop);
+                cf::CFRelease(run_loop)
+            }
+        }
+        *state = Lifecycle::Stopped;
+    }
+}
+
+// Synchronize with
+// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
+bitflags! {
+    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
+  #[repr(C)]
+  pub struct StreamFlags: u32 {
+    const NONE = 0x00000000;
+    const MUST_SCAN_SUBDIRS = 0x00000001;
+    const USER_DROPPED = 0x00000002;
+    const KERNEL_DROPPED = 0x00000004;
+    const IDS_WRAPPED = 0x00000008;
+    const HISTORY_DONE = 0x00000010;
+    const ROOT_CHANGED = 0x00000020;
+    const MOUNT = 0x00000040;
+    const UNMOUNT = 0x00000080;
+    const ITEM_CREATED = 0x00000100;
+    const ITEM_REMOVED = 0x00000200;
+    const INODE_META_MOD = 0x00000400;
+    const ITEM_RENAMED = 0x00000800;
+    const ITEM_MODIFIED = 0x00001000;
+    const FINDER_INFO_MOD = 0x00002000;
+    const ITEM_CHANGE_OWNER = 0x00004000;
+    const ITEM_XATTR_MOD = 0x00008000;
+    const IS_FILE = 0x00010000;
+    const IS_DIR = 0x00020000;
+    const IS_SYMLINK = 0x00040000;
+    const OWN_EVENT = 0x00080000;
+    const IS_HARDLINK = 0x00100000;
+    const IS_LAST_HARDLINK = 0x00200000;
+    const ITEM_CLONED = 0x400000;
+  }
+}
+
+impl std::fmt::Display for StreamFlags {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
+            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
+        }
+        if self.contains(StreamFlags::USER_DROPPED) {
+            let _d = write!(f, "USER_DROPPED ");
+        }
+        if self.contains(StreamFlags::KERNEL_DROPPED) {
+            let _d = write!(f, "KERNEL_DROPPED ");
+        }
+        if self.contains(StreamFlags::IDS_WRAPPED) {
+            let _d = write!(f, "IDS_WRAPPED ");
+        }
+        if self.contains(StreamFlags::HISTORY_DONE) {
+            let _d = write!(f, "HISTORY_DONE ");
+        }
+        if self.contains(StreamFlags::ROOT_CHANGED) {
+            let _d = write!(f, "ROOT_CHANGED ");
+        }
+        if self.contains(StreamFlags::MOUNT) {
+            let _d = write!(f, "MOUNT ");
+        }
+        if self.contains(StreamFlags::UNMOUNT) {
+            let _d = write!(f, "UNMOUNT ");
+        }
+        if self.contains(StreamFlags::ITEM_CREATED) {
+            let _d = write!(f, "ITEM_CREATED ");
+        }
+        if self.contains(StreamFlags::ITEM_REMOVED) {
+            let _d = write!(f, "ITEM_REMOVED ");
+        }
+        if self.contains(StreamFlags::INODE_META_MOD) {
+            let _d = write!(f, "INODE_META_MOD ");
+        }
+        if self.contains(StreamFlags::ITEM_RENAMED) {
+            let _d = write!(f, "ITEM_RENAMED ");
+        }
+        if self.contains(StreamFlags::ITEM_MODIFIED) {
+            let _d = write!(f, "ITEM_MODIFIED ");
+        }
+        if self.contains(StreamFlags::FINDER_INFO_MOD) {
+            let _d = write!(f, "FINDER_INFO_MOD ");
+        }
+        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
+            let _d = write!(f, "ITEM_CHANGE_OWNER ");
+        }
+        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
+            let _d = write!(f, "ITEM_XATTR_MOD ");
+        }
+        if self.contains(StreamFlags::IS_FILE) {
+            let _d = write!(f, "IS_FILE ");
+        }
+        if self.contains(StreamFlags::IS_DIR) {
+            let _d = write!(f, "IS_DIR ");
+        }
+        if self.contains(StreamFlags::IS_SYMLINK) {
+            let _d = write!(f, "IS_SYMLINK ");
+        }
+        if self.contains(StreamFlags::OWN_EVENT) {
+            let _d = write!(f, "OWN_EVENT ");
+        }
+        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
+            let _d = write!(f, "IS_LAST_HARDLINK ");
+        }
+        if self.contains(StreamFlags::IS_HARDLINK) {
+            let _d = write!(f, "IS_HARDLINK ");
+        }
+        if self.contains(StreamFlags::ITEM_CLONED) {
+            let _d = write!(f, "ITEM_CLONED ");
+        }
+        write!(f, "")
+    }
+}
+
+#[link(name = "CoreServices", kind = "framework")]
+unsafe extern "C" {
+    pub fn FSEventsGetCurrentEventId() -> u64;
+}
+
+// These tests are disabled by default because they seem to be unresolvably flaky.
+// Feel free to bring them back to help test this code
+#[cfg(false)]
+mod tests {
+    use super::*;
+    use std::{fs, sync::mpsc, thread, time::Duration};
+
+    #[test]
+    fn test_event_stream_simple() {
+        for _ in 0..3 {
+            let dir = tempfile::Builder::new()
+                .prefix("test-event-stream")
+                .tempdir()
+                .unwrap();
+            let path = dir.path().canonicalize().unwrap();
+            for i in 0..10 {
+                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
+            }
+            flush_historical_events();
+
+            let (tx, rx) = mpsc::channel();
+            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
+
+            fs::write(path.join("new-file"), "").unwrap();
+            let events = rx.recv_timeout(timeout()).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("new-file"));
+            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
+
+            fs::remove_file(path.join("existing-file-5")).unwrap();
+            let mut events = rx.recv_timeout(timeout()).unwrap();
+            let mut event = events.last().unwrap();
+            // we see this duplicate about 1/100 test runs.
+            if event.path == path.join("new-file")
+                && event.flags.contains(StreamFlags::ITEM_CREATED)
+            {
+                events = rx.recv_timeout(timeout()).unwrap();
+                event = events.last().unwrap();
+            }
+            assert_eq!(event.path, path.join("existing-file-5"));
+            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
+            drop(handle);
+        }
+    }
+
+    #[test]
+    fn test_event_stream_delayed_start() {
+        for _ in 0..3 {
+            let dir = tempfile::Builder::new()
+                .prefix("test-event-stream")
+                .tempdir()
+                .unwrap();
+            let path = dir.path().canonicalize().unwrap();
+            for i in 0..10 {
+                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
+            }
+            flush_historical_events();
+
+            let (tx, rx) = mpsc::channel();
+            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+
+            // Delay the call to `run` in order to make sure we don't miss any events that occur
+            // between creating the `EventStream` and calling `run`.
+            thread::spawn(move || {
+                thread::sleep(Duration::from_millis(100));
+                stream.run(move |events| tx.send(events.to_vec()).is_ok())
+            });
+
+            fs::write(path.join("new-file"), "").unwrap();
+            let events = rx.recv_timeout(timeout()).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("new-file"));
+            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
+
+            fs::remove_file(path.join("existing-file-5")).unwrap();
+            let events = rx.recv_timeout(timeout()).unwrap();
+            let event = events.last().unwrap();
+            assert_eq!(event.path, path.join("existing-file-5"));
+            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
+            drop(handle);
+        }
+    }
+
+    #[test]
+    fn test_event_stream_shutdown_by_dropping_handle() {
+        let dir = tempfile::Builder::new()
+            .prefix("test-event-stream")
+            .tempdir()
+            .unwrap();
+        let path = dir.path().canonicalize().unwrap();
+        flush_historical_events();
+
+        let (tx, rx) = mpsc::channel();
+        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+        thread::spawn(move || {
+            stream.run({
+                let tx = tx.clone();
+                move |_| {
+                    tx.send("running").unwrap();
+                    true
+                }
+            });
+            tx.send("stopped").unwrap();
+        });
+
+        fs::write(path.join("new-file"), "").unwrap();
+        assert_eq!(rx.recv_timeout(timeout()).unwrap(), "running");
+
+        // Dropping the handle causes `EventStream::run` to return.
+        drop(handle);
+        assert_eq!(rx.recv_timeout(timeout()).unwrap(), "stopped");
+    }
+
+    #[test]
+    fn test_event_stream_shutdown_before_run() {
+        let dir = tempfile::Builder::new()
+            .prefix("test-event-stream")
+            .tempdir()
+            .unwrap();
+        let path = dir.path().canonicalize().unwrap();
+
+        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+        drop(handle);
+
+        // This returns immediately because the handle was already dropped.
+        stream.run(|_| true);
+    }
+
+    fn flush_historical_events() {
+        thread::sleep(timeout());
+    }
+
+    fn timeout() -> Duration {
+        if std::env::var("CI").is_ok() {
+            Duration::from_secs(4)
+        } else {
+            Duration::from_millis(500)
+        }
+    }
+}