Add back "fs: Replace MacWatcher with notify::FsEventWatcher"" (#47952)

Anthony Eid created

Reverts zed-industries/zed#47799

Now that we have a week to let this be tested in Nightly we can merge it
again

Release Notes:
 
- Fix to many file descriptors error in MacOS

Change summary

Cargo.lock                        |  26 -
Cargo.toml                        |   3 
crates/fs/Cargo.toml              |   5 
crates/fs/src/fs.rs               |  60 ---
crates/fs/src/fs_watcher.rs       |  25 +
crates/fs/src/mac_watcher.rs      |  77 ----
crates/fsevent/Cargo.toml         |  28 -
crates/fsevent/LICENSE-GPL        |   1 
crates/fsevent/examples/events.rs |  23 -
crates/fsevent/src/fsevent.rs     | 515 ---------------------------------
10 files changed, 24 insertions(+), 739 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -6561,7 +6561,6 @@ dependencies = [
  "cocoa 0.26.0",
  "collections",
  "fs",
- "fsevent",
  "futures 0.3.31",
  "git",
  "gpui",
@@ -6620,27 +6619,6 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
 
-[[package]]
-name = "fsevent"
-version = "0.1.0"
-dependencies = [
- "bitflags 2.9.4",
- "core-foundation 0.10.0",
- "fsevent-sys 3.1.0",
- "log",
- "parking_lot",
- "tempfile",
-]
-
-[[package]]
-name = "fsevent-sys"
-version = "3.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca6f5e6817058771c10f0eb0f05ddf1e35844266f972004fe8e4b21fda295bd5"
-dependencies = [
- "libc",
-]
-
 [[package]]
 name = "fsevent-sys"
 version = "4.1.0"
@@ -10644,7 +10622,7 @@ dependencies = [
  "bitflags 2.9.4",
  "crossbeam-channel",
  "filetime",
- "fsevent-sys 4.1.0",
+ "fsevent-sys",
  "inotify 0.9.6",
  "kqueue",
  "libc",
@@ -10660,7 +10638,7 @@ version = "8.2.0"
 source = "git+https://github.com/zed-industries/notify.git?rev=6c550ac3c56cbd143c57ea6390e197af9d790908#6c550ac3c56cbd143c57ea6390e197af9d790908"
 dependencies = [
  "bitflags 2.9.4",
- "fsevent-sys 4.1.0",
+ "fsevent-sys",
  "inotify 0.11.0",
  "kqueue",
  "libc",

Cargo.toml 🔗

@@ -76,7 +76,6 @@ members = [
     "crates/file_icons",
     "crates/fs",
     "crates/fs_benchmarks",
-    "crates/fsevent",
     "crates/fuzzy",
     "crates/git",
     "crates/git_graph",
@@ -313,7 +312,6 @@ feedback = { path = "crates/feedback" }
 file_finder = { path = "crates/file_finder" }
 file_icons = { path = "crates/file_icons" }
 fs = { path = "crates/fs" }
-fsevent = { path = "crates/fsevent" }
 fuzzy = { path = "crates/fuzzy" }
 git = { path = "crates/git" }
 git_graph = { path = "crates/git_graph" }
@@ -835,7 +833,6 @@ command_palette = { codegen-units = 1 }
 command_palette_hooks = { codegen-units = 1 }
 feature_flags = { codegen-units = 1 }
 file_icons = { codegen-units = 1 }
-fsevent = { codegen-units = 1 }
 image_viewer = { codegen-units = 1 }
 edit_prediction_ui = { codegen-units = 1 }
 install_cli = { codegen-units = 1 }

crates/fs/Cargo.toml 🔗

@@ -40,15 +40,12 @@ text.workspace = true
 time.workspace = true
 util.workspace = true
 is_executable = "1.0.5"
+notify = "8.2.0"
 
 [target.'cfg(target_os = "macos")'.dependencies]
-fsevent.workspace = true
 objc.workspace = true
 cocoa = "0.26"
 
-[target.'cfg(not(target_os = "macos"))'.dependencies]
-notify = "8.2.0"
-
 [target.'cfg(target_os = "windows")'.dependencies]
 windows.workspace = true
 

crates/fs/src/fs.rs 🔗

@@ -1,7 +1,3 @@
-#[cfg(target_os = "macos")]
-mod mac_watcher;
-
-#[cfg(not(target_os = "macos"))]
 pub mod fs_watcher;
 
 use parking_lot::Mutex;
@@ -976,62 +972,6 @@ impl Fs for RealFs {
         Ok(Box::pin(result))
     }
 
-    #[cfg(target_os = "macos")]
-    async fn watch(
-        &self,
-        path: &Path,
-        latency: Duration,
-    ) -> (
-        Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
-        Arc<dyn Watcher>,
-    ) {
-        use fsevent::StreamFlags;
-
-        let (events_tx, events_rx) = smol::channel::unbounded();
-        let handles = Arc::new(parking_lot::Mutex::new(collections::BTreeMap::default()));
-        let watcher = Arc::new(mac_watcher::MacWatcher::new(
-            events_tx,
-            Arc::downgrade(&handles),
-            latency,
-        ));
-        watcher.add(path).expect("handles can't be dropped");
-
-        (
-            Box::pin(
-                events_rx
-                    .map(|events| {
-                        events
-                            .into_iter()
-                            .map(|event| {
-                                log::trace!("fs path event: {event:?}");
-                                let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) {
-                                    Some(PathEventKind::Removed)
-                                } else if event.flags.contains(StreamFlags::ITEM_CREATED) {
-                                    Some(PathEventKind::Created)
-                                } else if event.flags.contains(StreamFlags::ITEM_MODIFIED)
-                                    | event.flags.contains(StreamFlags::ITEM_RENAMED)
-                                {
-                                    Some(PathEventKind::Changed)
-                                } else {
-                                    None
-                                };
-                                PathEvent {
-                                    path: event.path,
-                                    kind,
-                                }
-                            })
-                            .collect()
-                    })
-                    .chain(futures::stream::once(async move {
-                        drop(handles);
-                        vec![]
-                    })),
-            ),
-            watcher,
-        )
-    }
-
-    #[cfg(not(target_os = "macos"))]
     async fn watch(
         &self,
         path: &Path,

crates/fs/src/fs_watcher.rs 🔗

@@ -50,7 +50,7 @@ impl Watcher for FsWatcher {
         let tx = self.tx.clone();
         let pending_paths = self.pending_path_events.clone();
 
-        #[cfg(target_os = "windows")]
+        #[cfg(any(target_os = "windows", target_os = "macos"))]
         {
             // Return early if an ancestor of this path was already being watched.
             // saves a huge amount of memory
@@ -81,7 +81,7 @@ impl Watcher for FsWatcher {
         let root_path = SanitizedPath::new_arc(path);
         let path: Arc<std::path::Path> = path.into();
 
-        #[cfg(target_os = "windows")]
+        #[cfg(any(target_os = "windows", target_os = "macos"))]
         let mode = notify::RecursiveMode::Recursive;
         #[cfg(target_os = "linux")]
         let mode = notify::RecursiveMode::NonRecursive;
@@ -166,6 +166,8 @@ pub struct GlobalWatcher {
     watcher: Mutex<notify::KqueueWatcher>,
     #[cfg(target_os = "windows")]
     watcher: Mutex<notify::ReadDirectoryChangesWatcher>,
+    #[cfg(target_os = "macos")]
+    watcher: Mutex<notify::FsEventWatcher>,
 }
 
 impl GlobalWatcher {
@@ -178,10 +180,25 @@ impl GlobalWatcher {
     ) -> anyhow::Result<WatcherRegistrationId> {
         use notify::Watcher;
 
-        self.watcher.lock().watch(&path, mode)?;
-
         let mut state = self.state.lock();
 
+        // Check if this path is already covered by an existing watched ancestor path.
+        // On macOS and Windows, watching is recursive, so we don't need to watch
+        // child paths if an ancestor is already being watched.
+        #[cfg(any(target_os = "windows", target_os = "macos"))]
+        let path_already_covered = state.path_registrations.keys().any(|existing| {
+            path.starts_with(existing.as_ref()) && path.as_ref() != existing.as_ref()
+        });
+
+        #[cfg(not(any(target_os = "windows", target_os = "macos")))]
+        let path_already_covered = false;
+
+        if !path_already_covered && !state.path_registrations.contains_key(&path) {
+            drop(state);
+            self.watcher.lock().watch(&path, mode)?;
+            state = self.state.lock();
+        }
+
         let id = state.last_registration;
         state.last_registration = WatcherRegistrationId(id.0 + 1);
 

crates/fs/src/mac_watcher.rs 🔗

@@ -1,77 +0,0 @@
-use crate::Watcher;
-use anyhow::{Context as _, Result};
-use collections::{BTreeMap, Bound};
-use fsevent::EventStream;
-use parking_lot::Mutex;
-use std::{
-    path::{Path, PathBuf},
-    sync::Weak,
-    thread,
-    time::Duration,
-};
-
-pub struct MacWatcher {
-    events_tx: smol::channel::Sender<Vec<fsevent::Event>>,
-    handles: Weak<Mutex<BTreeMap<PathBuf, fsevent::Handle>>>,
-    latency: Duration,
-}
-
-impl MacWatcher {
-    pub fn new(
-        events_tx: smol::channel::Sender<Vec<fsevent::Event>>,
-        handles: Weak<Mutex<BTreeMap<PathBuf, fsevent::Handle>>>,
-        latency: Duration,
-    ) -> Self {
-        Self {
-            events_tx,
-            handles,
-            latency,
-        }
-    }
-}
-
-impl Watcher for MacWatcher {
-    fn add(&self, path: &Path) -> Result<()> {
-        log::trace!("mac watcher add: {:?}", path);
-        let handles = self
-            .handles
-            .upgrade()
-            .context("unable to watch path, receiver dropped")?;
-        let mut handles = handles.lock();
-
-        // Return early if an ancestor of this path was already being watched.
-        if let Some((watched_path, _)) = handles
-            .range::<Path, _>((Bound::Unbounded, Bound::Included(path)))
-            .next_back()
-            && path.starts_with(watched_path)
-        {
-            log::trace!(
-                "mac watched path starts with existing watched path: {watched_path:?}, {path:?}"
-            );
-            return Ok(());
-        }
-
-        let (stream, handle) = EventStream::new(&[path], self.latency);
-        let tx = self.events_tx.clone();
-        thread::Builder::new()
-            .name("MacWatcher".to_owned())
-            .spawn(move || {
-                stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
-            })
-            .unwrap();
-        handles.insert(path.into(), handle);
-
-        Ok(())
-    }
-
-    fn remove(&self, path: &Path) -> anyhow::Result<()> {
-        let handles = self
-            .handles
-            .upgrade()
-            .context("unable to remove path, receiver dropped")?;
-
-        let mut handles = handles.lock();
-        handles.remove(path);
-        Ok(())
-    }
-}

crates/fsevent/Cargo.toml 🔗

@@ -1,28 +0,0 @@
-[package]
-name = "fsevent"
-version = "0.1.0"
-edition.workspace = true
-publish.workspace = true
-license = "GPL-3.0-or-later"
-
-[lints]
-workspace = true
-
-[lib]
-path = "src/fsevent.rs"
-doctest = false
-
-[dependencies]
-bitflags.workspace = true
-parking_lot.workspace = true
-log.workspace = true
-
-[target.'cfg(target_os = "macos")'.dependencies]
-core-foundation.workspace = true
-fsevent-sys = "3.0.2"
-
-[dev-dependencies]
-tempfile.workspace = true
-
-[package.metadata.docs.rs]
-targets = ["x86_64-apple-darwin"]

crates/fsevent/examples/events.rs 🔗

@@ -1,23 +0,0 @@
-#[cfg(target_os = "macos")]
-fn main() {
-    use fsevent::EventStream;
-    use std::{env::args, path::Path, time::Duration};
-
-    let paths = args().skip(1).collect::<Vec<_>>();
-    let paths = paths.iter().map(Path::new).collect::<Vec<_>>();
-    assert!(!paths.is_empty(), "Must pass 1 or more paths as arguments");
-
-    let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100));
-    stream.run(|events| {
-        eprintln!("event batch");
-        for event in events {
-            eprintln!("  {:?}", event);
-        }
-        true
-    });
-}
-
-#[cfg(not(target_os = "macos"))]
-fn main() {
-    eprintln!("This example only works on macOS");
-}

crates/fsevent/src/fsevent.rs 🔗

@@ -1,515 +0,0 @@
-#![cfg(target_os = "macos")]
-
-use bitflags::bitflags;
-use fsevent_sys::{self as fs, core_foundation as cf};
-use parking_lot::Mutex;
-use std::{
-    convert::AsRef,
-    ffi::{CStr, OsStr, c_void},
-    os::unix::ffi::OsStrExt,
-    path::{Path, PathBuf},
-    ptr, slice,
-    sync::Arc,
-    time::Duration,
-};
-
-#[derive(Clone, Debug)]
-pub struct Event {
-    pub event_id: u64,
-    pub flags: StreamFlags,
-    pub path: PathBuf,
-}
-
-pub struct EventStream {
-    lifecycle: Arc<Mutex<Lifecycle>>,
-    state: Box<State>,
-}
-
-struct State {
-    latency: Duration,
-    paths: cf::CFMutableArrayRef,
-    callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
-    last_valid_event_id: Option<fs::FSEventStreamEventId>,
-    stream: fs::FSEventStreamRef,
-}
-
-impl Drop for State {
-    fn drop(&mut self) {
-        unsafe {
-            cf::CFRelease(self.paths);
-            fs::FSEventStreamStop(self.stream);
-            fs::FSEventStreamInvalidate(self.stream);
-            fs::FSEventStreamRelease(self.stream);
-        }
-    }
-}
-
-enum Lifecycle {
-    New,
-    Running(cf::CFRunLoopRef),
-    Stopped,
-}
-
-pub struct Handle(Arc<Mutex<Lifecycle>>);
-
-unsafe impl Send for EventStream {}
-unsafe impl Send for Lifecycle {}
-
-impl EventStream {
-    pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
-        unsafe {
-            let cf_paths =
-                cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
-            assert!(!cf_paths.is_null());
-
-            for path in paths {
-                let path_bytes = path.as_os_str().as_bytes();
-                let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
-                    cf::kCFAllocatorDefault,
-                    path_bytes.as_ptr() as *const i8,
-                    path_bytes.len() as cf::CFIndex,
-                    false,
-                );
-                if !cf_url.is_null() {
-                    let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
-                    cf::CFArrayAppendValue(cf_paths, cf_path);
-                    cf::CFRelease(cf_path);
-                    cf::CFRelease(cf_url);
-                } else {
-                    log::error!("Failed to create CFURL for path: {path:?}");
-                }
-            }
-
-            let mut state = Box::new(State {
-                latency,
-                paths: cf_paths,
-                callback: None,
-                last_valid_event_id: None,
-                stream: ptr::null_mut(),
-            });
-            let stream_context = fs::FSEventStreamContext {
-                version: 0,
-                info: state.as_ref() as *const _ as *mut c_void,
-                retain: None,
-                release: None,
-                copy_description: None,
-            };
-            let stream = fs::FSEventStreamCreate(
-                cf::kCFAllocatorDefault,
-                Self::trampoline,
-                &stream_context,
-                cf_paths,
-                FSEventsGetCurrentEventId(),
-                latency.as_secs_f64(),
-                fs::kFSEventStreamCreateFlagFileEvents
-                    | fs::kFSEventStreamCreateFlagNoDefer
-                    | fs::kFSEventStreamCreateFlagWatchRoot,
-            );
-            state.stream = stream;
-
-            let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
-            (
-                EventStream {
-                    lifecycle: lifecycle.clone(),
-                    state,
-                },
-                Handle(lifecycle),
-            )
-        }
-    }
-
-    pub fn run<F>(mut self, f: F)
-    where
-        F: FnMut(Vec<Event>) -> bool + 'static,
-    {
-        self.state.callback = Some(Box::new(f));
-        unsafe {
-            let run_loop =
-                core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
-            {
-                let mut state = self.lifecycle.lock();
-                match *state {
-                    Lifecycle::New => *state = Lifecycle::Running(run_loop),
-                    Lifecycle::Running(_) => unreachable!(),
-                    Lifecycle::Stopped => return,
-                }
-            }
-            fs::FSEventStreamScheduleWithRunLoop(
-                self.state.stream,
-                run_loop,
-                cf::kCFRunLoopDefaultMode,
-            );
-            fs::FSEventStreamStart(self.state.stream);
-            cf::CFRunLoopRun();
-        }
-    }
-
-    extern "C" fn trampoline(
-        stream_ref: fs::FSEventStreamRef,
-        info: *mut ::std::os::raw::c_void,
-        num: usize,                                 // size_t numEvents
-        event_paths: *mut ::std::os::raw::c_void,   // void *eventPaths
-        event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
-        event_ids: *const ::std::os::raw::c_void,   // const FSEventStreamEventId eventIds[]
-    ) {
-        unsafe {
-            let event_paths = event_paths as *const *const ::std::os::raw::c_char;
-            let e_ptr = event_flags as *mut u32;
-            let i_ptr = event_ids as *mut u64;
-            let state = (info as *mut State).as_mut().unwrap();
-            let callback = if let Some(callback) = state.callback.as_mut() {
-                callback
-            } else {
-                return;
-            };
-
-            let paths = slice::from_raw_parts(event_paths, num);
-            let flags = slice::from_raw_parts_mut(e_ptr, num);
-            let ids = slice::from_raw_parts_mut(i_ptr, num);
-            let mut stream_restarted = false;
-
-            // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
-            // or our code couldn't keep up with the sheer volume of file-system events that were
-            // generated. If we observed a valid event before this happens, we'll try to read the
-            // file-system journal by stopping the current stream and creating a new one starting at
-            // such event. Otherwise, we'll let invoke the callback with the dropped event, which
-            // will likely perform a re-scan of one of the root directories.
-            if flags
-                .iter()
-                .copied()
-                .filter_map(StreamFlags::from_bits)
-                .any(|flags| {
-                    flags.contains(StreamFlags::USER_DROPPED)
-                        || flags.contains(StreamFlags::KERNEL_DROPPED)
-                })
-                && let Some(last_valid_event_id) = state.last_valid_event_id.take()
-            {
-                fs::FSEventStreamStop(state.stream);
-                fs::FSEventStreamInvalidate(state.stream);
-                fs::FSEventStreamRelease(state.stream);
-
-                let stream_context = fs::FSEventStreamContext {
-                    version: 0,
-                    info,
-                    retain: None,
-                    release: None,
-                    copy_description: None,
-                };
-                let stream = fs::FSEventStreamCreate(
-                    cf::kCFAllocatorDefault,
-                    Self::trampoline,
-                    &stream_context,
-                    state.paths,
-                    last_valid_event_id,
-                    state.latency.as_secs_f64(),
-                    fs::kFSEventStreamCreateFlagFileEvents
-                        | fs::kFSEventStreamCreateFlagNoDefer
-                        | fs::kFSEventStreamCreateFlagWatchRoot,
-                );
-
-                state.stream = stream;
-                fs::FSEventStreamScheduleWithRunLoop(
-                    state.stream,
-                    cf::CFRunLoopGetCurrent(),
-                    cf::kCFRunLoopDefaultMode,
-                );
-                fs::FSEventStreamStart(state.stream);
-                stream_restarted = true;
-            }
-
-            if !stream_restarted {
-                let mut events = Vec::with_capacity(num);
-                for p in 0..num {
-                    if let Some(flag) = StreamFlags::from_bits(flags[p]) {
-                        if !flag.contains(StreamFlags::HISTORY_DONE) {
-                            let path_c_str = CStr::from_ptr(paths[p]);
-                            let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
-                            let event = Event {
-                                event_id: ids[p],
-                                flags: flag,
-                                path,
-                            };
-                            state.last_valid_event_id = Some(event.event_id);
-                            events.push(event);
-                        }
-                    } else {
-                        debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
-                    }
-                }
-
-                if !events.is_empty() && !callback(events) {
-                    fs::FSEventStreamStop(stream_ref);
-                    cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
-                }
-            }
-        }
-    }
-}
-
-impl Drop for Handle {
-    fn drop(&mut self) {
-        let mut state = self.0.lock();
-        if let Lifecycle::Running(run_loop) = *state {
-            unsafe {
-                cf::CFRunLoopStop(run_loop);
-                cf::CFRelease(run_loop)
-            }
-        }
-        *state = Lifecycle::Stopped;
-    }
-}
-
-// Synchronize with
-// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
-bitflags! {
-    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
-  #[repr(C)]
-  pub struct StreamFlags: u32 {
-    const NONE = 0x00000000;
-    const MUST_SCAN_SUBDIRS = 0x00000001;
-    const USER_DROPPED = 0x00000002;
-    const KERNEL_DROPPED = 0x00000004;
-    const IDS_WRAPPED = 0x00000008;
-    const HISTORY_DONE = 0x00000010;
-    const ROOT_CHANGED = 0x00000020;
-    const MOUNT = 0x00000040;
-    const UNMOUNT = 0x00000080;
-    const ITEM_CREATED = 0x00000100;
-    const ITEM_REMOVED = 0x00000200;
-    const INODE_META_MOD = 0x00000400;
-    const ITEM_RENAMED = 0x00000800;
-    const ITEM_MODIFIED = 0x00001000;
-    const FINDER_INFO_MOD = 0x00002000;
-    const ITEM_CHANGE_OWNER = 0x00004000;
-    const ITEM_XATTR_MOD = 0x00008000;
-    const IS_FILE = 0x00010000;
-    const IS_DIR = 0x00020000;
-    const IS_SYMLINK = 0x00040000;
-    const OWN_EVENT = 0x00080000;
-    const IS_HARDLINK = 0x00100000;
-    const IS_LAST_HARDLINK = 0x00200000;
-    const ITEM_CLONED = 0x400000;
-  }
-}
-
-impl std::fmt::Display for StreamFlags {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
-            let _d = write!(f, "MUST_SCAN_SUBDIRS ");
-        }
-        if self.contains(StreamFlags::USER_DROPPED) {
-            let _d = write!(f, "USER_DROPPED ");
-        }
-        if self.contains(StreamFlags::KERNEL_DROPPED) {
-            let _d = write!(f, "KERNEL_DROPPED ");
-        }
-        if self.contains(StreamFlags::IDS_WRAPPED) {
-            let _d = write!(f, "IDS_WRAPPED ");
-        }
-        if self.contains(StreamFlags::HISTORY_DONE) {
-            let _d = write!(f, "HISTORY_DONE ");
-        }
-        if self.contains(StreamFlags::ROOT_CHANGED) {
-            let _d = write!(f, "ROOT_CHANGED ");
-        }
-        if self.contains(StreamFlags::MOUNT) {
-            let _d = write!(f, "MOUNT ");
-        }
-        if self.contains(StreamFlags::UNMOUNT) {
-            let _d = write!(f, "UNMOUNT ");
-        }
-        if self.contains(StreamFlags::ITEM_CREATED) {
-            let _d = write!(f, "ITEM_CREATED ");
-        }
-        if self.contains(StreamFlags::ITEM_REMOVED) {
-            let _d = write!(f, "ITEM_REMOVED ");
-        }
-        if self.contains(StreamFlags::INODE_META_MOD) {
-            let _d = write!(f, "INODE_META_MOD ");
-        }
-        if self.contains(StreamFlags::ITEM_RENAMED) {
-            let _d = write!(f, "ITEM_RENAMED ");
-        }
-        if self.contains(StreamFlags::ITEM_MODIFIED) {
-            let _d = write!(f, "ITEM_MODIFIED ");
-        }
-        if self.contains(StreamFlags::FINDER_INFO_MOD) {
-            let _d = write!(f, "FINDER_INFO_MOD ");
-        }
-        if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
-            let _d = write!(f, "ITEM_CHANGE_OWNER ");
-        }
-        if self.contains(StreamFlags::ITEM_XATTR_MOD) {
-            let _d = write!(f, "ITEM_XATTR_MOD ");
-        }
-        if self.contains(StreamFlags::IS_FILE) {
-            let _d = write!(f, "IS_FILE ");
-        }
-        if self.contains(StreamFlags::IS_DIR) {
-            let _d = write!(f, "IS_DIR ");
-        }
-        if self.contains(StreamFlags::IS_SYMLINK) {
-            let _d = write!(f, "IS_SYMLINK ");
-        }
-        if self.contains(StreamFlags::OWN_EVENT) {
-            let _d = write!(f, "OWN_EVENT ");
-        }
-        if self.contains(StreamFlags::IS_LAST_HARDLINK) {
-            let _d = write!(f, "IS_LAST_HARDLINK ");
-        }
-        if self.contains(StreamFlags::IS_HARDLINK) {
-            let _d = write!(f, "IS_HARDLINK ");
-        }
-        if self.contains(StreamFlags::ITEM_CLONED) {
-            let _d = write!(f, "ITEM_CLONED ");
-        }
-        write!(f, "")
-    }
-}
-
-#[link(name = "CoreServices", kind = "framework")]
-unsafe extern "C" {
-    pub fn FSEventsGetCurrentEventId() -> u64;
-}
-
-// These tests are disabled by default because they seem to be unresolvably flaky.
-// Feel free to bring them back to help test this code
-#[cfg(false)]
-mod tests {
-    use super::*;
-    use std::{fs, sync::mpsc, thread, time::Duration};
-
-    #[test]
-    fn test_event_stream_simple() {
-        for _ in 0..3 {
-            let dir = tempfile::Builder::new()
-                .prefix("test-event-stream")
-                .tempdir()
-                .unwrap();
-            let path = dir.path().canonicalize().unwrap();
-            for i in 0..10 {
-                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
-            }
-            flush_historical_events();
-
-            let (tx, rx) = mpsc::channel();
-            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-            thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
-
-            fs::write(path.join("new-file"), "").unwrap();
-            let events = rx.recv_timeout(timeout()).unwrap();
-            let event = events.last().unwrap();
-            assert_eq!(event.path, path.join("new-file"));
-            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
-
-            fs::remove_file(path.join("existing-file-5")).unwrap();
-            let mut events = rx.recv_timeout(timeout()).unwrap();
-            let mut event = events.last().unwrap();
-            // we see this duplicate about 1/100 test runs.
-            if event.path == path.join("new-file")
-                && event.flags.contains(StreamFlags::ITEM_CREATED)
-            {
-                events = rx.recv_timeout(timeout()).unwrap();
-                event = events.last().unwrap();
-            }
-            assert_eq!(event.path, path.join("existing-file-5"));
-            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
-            drop(handle);
-        }
-    }
-
-    #[test]
-    fn test_event_stream_delayed_start() {
-        for _ in 0..3 {
-            let dir = tempfile::Builder::new()
-                .prefix("test-event-stream")
-                .tempdir()
-                .unwrap();
-            let path = dir.path().canonicalize().unwrap();
-            for i in 0..10 {
-                fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
-            }
-            flush_historical_events();
-
-            let (tx, rx) = mpsc::channel();
-            let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-
-            // Delay the call to `run` in order to make sure we don't miss any events that occur
-            // between creating the `EventStream` and calling `run`.
-            thread::spawn(move || {
-                thread::sleep(Duration::from_millis(100));
-                stream.run(move |events| tx.send(events.to_vec()).is_ok())
-            });
-
-            fs::write(path.join("new-file"), "").unwrap();
-            let events = rx.recv_timeout(timeout()).unwrap();
-            let event = events.last().unwrap();
-            assert_eq!(event.path, path.join("new-file"));
-            assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
-
-            fs::remove_file(path.join("existing-file-5")).unwrap();
-            let events = rx.recv_timeout(timeout()).unwrap();
-            let event = events.last().unwrap();
-            assert_eq!(event.path, path.join("existing-file-5"));
-            assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
-            drop(handle);
-        }
-    }
-
-    #[test]
-    fn test_event_stream_shutdown_by_dropping_handle() {
-        let dir = tempfile::Builder::new()
-            .prefix("test-event-stream")
-            .tempdir()
-            .unwrap();
-        let path = dir.path().canonicalize().unwrap();
-        flush_historical_events();
-
-        let (tx, rx) = mpsc::channel();
-        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-        thread::spawn(move || {
-            stream.run({
-                let tx = tx.clone();
-                move |_| {
-                    tx.send("running").unwrap();
-                    true
-                }
-            });
-            tx.send("stopped").unwrap();
-        });
-
-        fs::write(path.join("new-file"), "").unwrap();
-        assert_eq!(rx.recv_timeout(timeout()).unwrap(), "running");
-
-        // Dropping the handle causes `EventStream::run` to return.
-        drop(handle);
-        assert_eq!(rx.recv_timeout(timeout()).unwrap(), "stopped");
-    }
-
-    #[test]
-    fn test_event_stream_shutdown_before_run() {
-        let dir = tempfile::Builder::new()
-            .prefix("test-event-stream")
-            .tempdir()
-            .unwrap();
-        let path = dir.path().canonicalize().unwrap();
-
-        let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-        drop(handle);
-
-        // This returns immediately because the handle was already dropped.
-        stream.run(|_| true);
-    }
-
-    fn flush_historical_events() {
-        thread::sleep(timeout());
-    }
-
-    fn timeout() -> Duration {
-        if std::env::var("CI").is_ok() {
-            Duration::from_secs(4)
-        } else {
-            Duration::from_millis(500)
-        }
-    }
-}