From 87d41c1d97009fb35b0fc433851abb66427e7a1a Mon Sep 17 00:00:00 2001 From: Anthony Eid <56899983+Anthony-Eid@users.noreply.github.com> Date: Tue, 27 Jan 2026 15:27:53 -0500 Subject: [PATCH] Revert "fs: Replace MacWatcher with notify::FsEventWatcher" (#47799) Reverts zed-industries/zed#47322. I'm going to remerge this tomorrow after releases so we get a full week in nightly to catch any bugs --- Cargo.lock | 26 +- Cargo.toml | 3 + crates/fs/Cargo.toml | 5 +- crates/fs/src/fs.rs | 60 ++++ crates/fs/src/fs_watcher.rs | 25 +- crates/fs/src/mac_watcher.rs | 77 +++++ crates/fsevent/Cargo.toml | 28 ++ crates/fsevent/LICENSE-GPL | 1 + crates/fsevent/examples/events.rs | 23 ++ crates/fsevent/src/fsevent.rs | 515 ++++++++++++++++++++++++++++++ 10 files changed, 739 insertions(+), 24 deletions(-) create mode 100644 crates/fs/src/mac_watcher.rs create mode 100644 crates/fsevent/Cargo.toml create mode 120000 crates/fsevent/LICENSE-GPL create mode 100644 crates/fsevent/examples/events.rs create mode 100644 crates/fsevent/src/fsevent.rs diff --git a/Cargo.lock b/Cargo.lock index c355df36f8cab19b9750b04a817b0bc379cdff1f..31b1a06c4372d8af4ef0a103b24f83d40afac63b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6518,6 +6518,7 @@ dependencies = [ "cocoa 0.26.0", "collections", "fs", + "fsevent", "futures 0.3.31", "git", "gpui", @@ -6576,6 +6577,27 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent" +version = "0.1.0" +dependencies = [ + "bitflags 2.9.4", + "core-foundation 0.10.0", + "fsevent-sys 3.1.0", + "log", + "parking_lot", + "tempfile", +] + +[[package]] +name = "fsevent-sys" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca6f5e6817058771c10f0eb0f05ddf1e35844266f972004fe8e4b21fda295bd5" +dependencies = [ + "libc", +] + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -10570,7 +10592,7 @@ dependencies = [ "bitflags 2.9.4", "crossbeam-channel", "filetime", - "fsevent-sys", + "fsevent-sys 4.1.0", "inotify 0.9.6", "kqueue", "libc", @@ -10586,7 +10608,7 @@ version = "8.2.0" source = "git+https://github.com/zed-industries/notify.git?rev=6c550ac3c56cbd143c57ea6390e197af9d790908#6c550ac3c56cbd143c57ea6390e197af9d790908" dependencies = [ "bitflags 2.9.4", - "fsevent-sys", + "fsevent-sys 4.1.0", "inotify 0.11.0", "kqueue", "libc", diff --git a/Cargo.toml b/Cargo.toml index d34319f5ed1b38738dd071258c90328916fe4f43..8e7ea3d734b25b00821c60d90ea165294ac8f2af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ members = [ "crates/file_icons", "crates/fs", "crates/fs_benchmarks", + "crates/fsevent", "crates/fuzzy", "crates/git", "crates/git_graph", @@ -312,6 +313,7 @@ feedback = { path = "crates/feedback" } file_finder = { path = "crates/file_finder" } file_icons = { path = "crates/file_icons" } fs = { path = "crates/fs" } +fsevent = { path = "crates/fsevent" } fuzzy = { path = "crates/fuzzy" } git = { path = "crates/git" } git_graph = { path = "crates/git_graph" } @@ -832,6 +834,7 @@ command_palette = { codegen-units = 1 } command_palette_hooks = { codegen-units = 1 } feature_flags = { codegen-units = 1 } file_icons = { codegen-units = 1 } +fsevent = { codegen-units = 1 } image_viewer = { codegen-units = 1 } edit_prediction_ui = { codegen-units = 1 } install_cli = { codegen-units = 1 } diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml index 6355524e4f328df0ca7fcf24c1df0557676ba6a6..c38a4584109edf422d754115399566f987cb8ea6 100644 --- a/crates/fs/Cargo.toml +++ b/crates/fs/Cargo.toml @@ -40,12 +40,15 @@ text.workspace = true time.workspace = true util.workspace = true is_executable = "1.0.5" -notify = "8.2.0" [target.'cfg(target_os = "macos")'.dependencies] +fsevent.workspace = true objc.workspace = true cocoa = "0.26" +[target.'cfg(not(target_os = "macos"))'.dependencies] +notify = "8.2.0" + [target.'cfg(target_os = "windows")'.dependencies] windows.workspace = true diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index 312cf28c21161c60289c8f41d0db0904e869455f..f8eed421b582d462f0a69436aba1533e749a9cf0 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -1,3 +1,7 @@ +#[cfg(target_os = "macos")] +mod mac_watcher; + +#[cfg(not(target_os = "macos"))] pub mod fs_watcher; use parking_lot::Mutex; @@ -972,6 +976,62 @@ impl Fs for RealFs { Ok(Box::pin(result)) } + #[cfg(target_os = "macos")] + async fn watch( + &self, + path: &Path, + latency: Duration, + ) -> ( + Pin>>>, + Arc, + ) { + use fsevent::StreamFlags; + + let (events_tx, events_rx) = smol::channel::unbounded(); + let handles = Arc::new(parking_lot::Mutex::new(collections::BTreeMap::default())); + let watcher = Arc::new(mac_watcher::MacWatcher::new( + events_tx, + Arc::downgrade(&handles), + latency, + )); + watcher.add(path).expect("handles can't be dropped"); + + ( + Box::pin( + events_rx + .map(|events| { + events + .into_iter() + .map(|event| { + log::trace!("fs path event: {event:?}"); + let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) { + Some(PathEventKind::Removed) + } else if event.flags.contains(StreamFlags::ITEM_CREATED) { + Some(PathEventKind::Created) + } else if event.flags.contains(StreamFlags::ITEM_MODIFIED) + | event.flags.contains(StreamFlags::ITEM_RENAMED) + { + Some(PathEventKind::Changed) + } else { + None + }; + PathEvent { + path: event.path, + kind, + } + }) + .collect() + }) + .chain(futures::stream::once(async move { + drop(handles); + vec![] + })), + ), + watcher, + ) + } + + #[cfg(not(target_os = "macos"))] async fn watch( &self, path: &Path, diff --git a/crates/fs/src/fs_watcher.rs b/crates/fs/src/fs_watcher.rs index efb381c9a5480df598d774dd17e9c49f8ef82f92..18d5dbeeb9e82948aaa503e7268d39c5d1852a2b 100644 --- a/crates/fs/src/fs_watcher.rs +++ b/crates/fs/src/fs_watcher.rs @@ -50,7 +50,7 @@ impl Watcher for FsWatcher { let tx = self.tx.clone(); let pending_paths = self.pending_path_events.clone(); - #[cfg(any(target_os = "windows", target_os = "macos"))] + #[cfg(target_os = "windows")] { // Return early if an ancestor of this path was already being watched. // saves a huge amount of memory @@ -81,7 +81,7 @@ impl Watcher for FsWatcher { let root_path = SanitizedPath::new_arc(path); let path: Arc = path.into(); - #[cfg(any(target_os = "windows", target_os = "macos"))] + #[cfg(target_os = "windows")] let mode = notify::RecursiveMode::Recursive; #[cfg(target_os = "linux")] let mode = notify::RecursiveMode::NonRecursive; @@ -166,8 +166,6 @@ pub struct GlobalWatcher { watcher: Mutex, #[cfg(target_os = "windows")] watcher: Mutex, - #[cfg(target_os = "macos")] - watcher: Mutex, } impl GlobalWatcher { @@ -180,24 +178,9 @@ impl GlobalWatcher { ) -> anyhow::Result { use notify::Watcher; - let mut state = self.state.lock(); - - // Check if this path is already covered by an existing watched ancestor path. - // On macOS and Windows, watching is recursive, so we don't need to watch - // child paths if an ancestor is already being watched. - #[cfg(any(target_os = "windows", target_os = "macos"))] - let path_already_covered = state.path_registrations.keys().any(|existing| { - path.starts_with(existing.as_ref()) && path.as_ref() != existing.as_ref() - }); - - #[cfg(not(any(target_os = "windows", target_os = "macos")))] - let path_already_covered = false; + self.watcher.lock().watch(&path, mode)?; - if !path_already_covered && !state.path_registrations.contains_key(&path) { - drop(state); - self.watcher.lock().watch(&path, mode)?; - state = self.state.lock(); - } + let mut state = self.state.lock(); let id = state.last_registration; state.last_registration = WatcherRegistrationId(id.0 + 1); diff --git a/crates/fs/src/mac_watcher.rs b/crates/fs/src/mac_watcher.rs new file mode 100644 index 0000000000000000000000000000000000000000..b781a231ba2bc33a895480ea278a7ccfe3364fe7 --- /dev/null +++ b/crates/fs/src/mac_watcher.rs @@ -0,0 +1,77 @@ +use crate::Watcher; +use anyhow::{Context as _, Result}; +use collections::{BTreeMap, Bound}; +use fsevent::EventStream; +use parking_lot::Mutex; +use std::{ + path::{Path, PathBuf}, + sync::Weak, + thread, + time::Duration, +}; + +pub struct MacWatcher { + events_tx: smol::channel::Sender>, + handles: Weak>>, + latency: Duration, +} + +impl MacWatcher { + pub fn new( + events_tx: smol::channel::Sender>, + handles: Weak>>, + latency: Duration, + ) -> Self { + Self { + events_tx, + handles, + latency, + } + } +} + +impl Watcher for MacWatcher { + fn add(&self, path: &Path) -> Result<()> { + log::trace!("mac watcher add: {:?}", path); + let handles = self + .handles + .upgrade() + .context("unable to watch path, receiver dropped")?; + let mut handles = handles.lock(); + + // Return early if an ancestor of this path was already being watched. + if let Some((watched_path, _)) = handles + .range::((Bound::Unbounded, Bound::Included(path))) + .next_back() + && path.starts_with(watched_path) + { + log::trace!( + "mac watched path starts with existing watched path: {watched_path:?}, {path:?}" + ); + return Ok(()); + } + + let (stream, handle) = EventStream::new(&[path], self.latency); + let tx = self.events_tx.clone(); + thread::Builder::new() + .name("MacWatcher".to_owned()) + .spawn(move || { + stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); + }) + .unwrap(); + handles.insert(path.into(), handle); + + Ok(()) + } + + fn remove(&self, path: &Path) -> anyhow::Result<()> { + let handles = self + .handles + .upgrade() + .context("unable to remove path, receiver dropped")?; + + let mut handles = handles.lock(); + handles.remove(path); + Ok(()) + } +} diff --git a/crates/fsevent/Cargo.toml b/crates/fsevent/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..635b36ebe14ee6823f8773bb38ff085516e320b9 --- /dev/null +++ b/crates/fsevent/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "fsevent" +version = "0.1.0" +edition.workspace = true +publish.workspace = true +license = "GPL-3.0-or-later" + +[lints] +workspace = true + +[lib] +path = "src/fsevent.rs" +doctest = false + +[dependencies] +bitflags.workspace = true +parking_lot.workspace = true +log.workspace = true + +[target.'cfg(target_os = "macos")'.dependencies] +core-foundation.workspace = true +fsevent-sys = "3.0.2" + +[dev-dependencies] +tempfile.workspace = true + +[package.metadata.docs.rs] +targets = ["x86_64-apple-darwin"] diff --git a/crates/fsevent/LICENSE-GPL b/crates/fsevent/LICENSE-GPL new file mode 120000 index 0000000000000000000000000000000000000000..89e542f750cd3860a0598eff0dc34b56d7336dc4 --- /dev/null +++ b/crates/fsevent/LICENSE-GPL @@ -0,0 +1 @@ +../../LICENSE-GPL \ No newline at end of file diff --git a/crates/fsevent/examples/events.rs b/crates/fsevent/examples/events.rs new file mode 100644 index 0000000000000000000000000000000000000000..bae9e734259992e5cf3e553dda2622aeec5eaa33 --- /dev/null +++ b/crates/fsevent/examples/events.rs @@ -0,0 +1,23 @@ +#[cfg(target_os = "macos")] +fn main() { + use fsevent::EventStream; + use std::{env::args, path::Path, time::Duration}; + + let paths = args().skip(1).collect::>(); + let paths = paths.iter().map(Path::new).collect::>(); + assert!(!paths.is_empty(), "Must pass 1 or more paths as arguments"); + + let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100)); + stream.run(|events| { + eprintln!("event batch"); + for event in events { + eprintln!(" {:?}", event); + } + true + }); +} + +#[cfg(not(target_os = "macos"))] +fn main() { + eprintln!("This example only works on macOS"); +} diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs new file mode 100644 index 0000000000000000000000000000000000000000..0fe3592010fa0e7e12b68c8dfc919807e91c8e64 --- /dev/null +++ b/crates/fsevent/src/fsevent.rs @@ -0,0 +1,515 @@ +#![cfg(target_os = "macos")] + +use bitflags::bitflags; +use fsevent_sys::{self as fs, core_foundation as cf}; +use parking_lot::Mutex; +use std::{ + convert::AsRef, + ffi::{CStr, OsStr, c_void}, + os::unix::ffi::OsStrExt, + path::{Path, PathBuf}, + ptr, slice, + sync::Arc, + time::Duration, +}; + +#[derive(Clone, Debug)] +pub struct Event { + pub event_id: u64, + pub flags: StreamFlags, + pub path: PathBuf, +} + +pub struct EventStream { + lifecycle: Arc>, + state: Box, +} + +struct State { + latency: Duration, + paths: cf::CFMutableArrayRef, + callback: Option) -> bool>>, + last_valid_event_id: Option, + stream: fs::FSEventStreamRef, +} + +impl Drop for State { + fn drop(&mut self) { + unsafe { + cf::CFRelease(self.paths); + fs::FSEventStreamStop(self.stream); + fs::FSEventStreamInvalidate(self.stream); + fs::FSEventStreamRelease(self.stream); + } + } +} + +enum Lifecycle { + New, + Running(cf::CFRunLoopRef), + Stopped, +} + +pub struct Handle(Arc>); + +unsafe impl Send for EventStream {} +unsafe impl Send for Lifecycle {} + +impl EventStream { + pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { + unsafe { + let cf_paths = + cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); + assert!(!cf_paths.is_null()); + + for path in paths { + let path_bytes = path.as_os_str().as_bytes(); + let cf_url = cf::CFURLCreateFromFileSystemRepresentation( + cf::kCFAllocatorDefault, + path_bytes.as_ptr() as *const i8, + path_bytes.len() as cf::CFIndex, + false, + ); + if !cf_url.is_null() { + let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); + cf::CFArrayAppendValue(cf_paths, cf_path); + cf::CFRelease(cf_path); + cf::CFRelease(cf_url); + } else { + log::error!("Failed to create CFURL for path: {path:?}"); + } + } + + let mut state = Box::new(State { + latency, + paths: cf_paths, + callback: None, + last_valid_event_id: None, + stream: ptr::null_mut(), + }); + let stream_context = fs::FSEventStreamContext { + version: 0, + info: state.as_ref() as *const _ as *mut c_void, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + cf_paths, + FSEventsGetCurrentEventId(), + latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + state.stream = stream; + + let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); + ( + EventStream { + lifecycle: lifecycle.clone(), + state, + }, + Handle(lifecycle), + ) + } + } + + pub fn run(mut self, f: F) + where + F: FnMut(Vec) -> bool + 'static, + { + self.state.callback = Some(Box::new(f)); + unsafe { + let run_loop = + core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void; + { + let mut state = self.lifecycle.lock(); + match *state { + Lifecycle::New => *state = Lifecycle::Running(run_loop), + Lifecycle::Running(_) => unreachable!(), + Lifecycle::Stopped => return, + } + } + fs::FSEventStreamScheduleWithRunLoop( + self.state.stream, + run_loop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(self.state.stream); + cf::CFRunLoopRun(); + } + } + + extern "C" fn trampoline( + stream_ref: fs::FSEventStreamRef, + info: *mut ::std::os::raw::c_void, + num: usize, // size_t numEvents + event_paths: *mut ::std::os::raw::c_void, // void *eventPaths + event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] + event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] + ) { + unsafe { + let event_paths = event_paths as *const *const ::std::os::raw::c_char; + let e_ptr = event_flags as *mut u32; + let i_ptr = event_ids as *mut u64; + let state = (info as *mut State).as_mut().unwrap(); + let callback = if let Some(callback) = state.callback.as_mut() { + callback + } else { + return; + }; + + let paths = slice::from_raw_parts(event_paths, num); + let flags = slice::from_raw_parts_mut(e_ptr, num); + let ids = slice::from_raw_parts_mut(i_ptr, num); + let mut stream_restarted = false; + + // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel + // or our code couldn't keep up with the sheer volume of file-system events that were + // generated. If we observed a valid event before this happens, we'll try to read the + // file-system journal by stopping the current stream and creating a new one starting at + // such event. Otherwise, we'll let invoke the callback with the dropped event, which + // will likely perform a re-scan of one of the root directories. + if flags + .iter() + .copied() + .filter_map(StreamFlags::from_bits) + .any(|flags| { + flags.contains(StreamFlags::USER_DROPPED) + || flags.contains(StreamFlags::KERNEL_DROPPED) + }) + && let Some(last_valid_event_id) = state.last_valid_event_id.take() + { + fs::FSEventStreamStop(state.stream); + fs::FSEventStreamInvalidate(state.stream); + fs::FSEventStreamRelease(state.stream); + + let stream_context = fs::FSEventStreamContext { + version: 0, + info, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + state.paths, + last_valid_event_id, + state.latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + + state.stream = stream; + fs::FSEventStreamScheduleWithRunLoop( + state.stream, + cf::CFRunLoopGetCurrent(), + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(state.stream); + stream_restarted = true; + } + + if !stream_restarted { + let mut events = Vec::with_capacity(num); + for p in 0..num { + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + if !flag.contains(StreamFlags::HISTORY_DONE) { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + let event = Event { + event_id: ids[p], + flags: flag, + path, + }; + state.last_valid_event_id = Some(event.event_id); + events.push(event); + } + } else { + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + } + } + + if !events.is_empty() && !callback(events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } + } + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + let mut state = self.0.lock(); + if let Lifecycle::Running(run_loop) = *state { + unsafe { + cf::CFRunLoopStop(run_loop); + cf::CFRelease(run_loop) + } + } + *state = Lifecycle::Stopped; + } +} + +// Synchronize with +// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h +bitflags! { + #[derive(Debug, PartialEq, Eq, Clone, Copy)] + #[repr(C)] + pub struct StreamFlags: u32 { + const NONE = 0x00000000; + const MUST_SCAN_SUBDIRS = 0x00000001; + const USER_DROPPED = 0x00000002; + const KERNEL_DROPPED = 0x00000004; + const IDS_WRAPPED = 0x00000008; + const HISTORY_DONE = 0x00000010; + const ROOT_CHANGED = 0x00000020; + const MOUNT = 0x00000040; + const UNMOUNT = 0x00000080; + const ITEM_CREATED = 0x00000100; + const ITEM_REMOVED = 0x00000200; + const INODE_META_MOD = 0x00000400; + const ITEM_RENAMED = 0x00000800; + const ITEM_MODIFIED = 0x00001000; + const FINDER_INFO_MOD = 0x00002000; + const ITEM_CHANGE_OWNER = 0x00004000; + const ITEM_XATTR_MOD = 0x00008000; + const IS_FILE = 0x00010000; + const IS_DIR = 0x00020000; + const IS_SYMLINK = 0x00040000; + const OWN_EVENT = 0x00080000; + const IS_HARDLINK = 0x00100000; + const IS_LAST_HARDLINK = 0x00200000; + const ITEM_CLONED = 0x400000; + } +} + +impl std::fmt::Display for StreamFlags { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) { + let _d = write!(f, "MUST_SCAN_SUBDIRS "); + } + if self.contains(StreamFlags::USER_DROPPED) { + let _d = write!(f, "USER_DROPPED "); + } + if self.contains(StreamFlags::KERNEL_DROPPED) { + let _d = write!(f, "KERNEL_DROPPED "); + } + if self.contains(StreamFlags::IDS_WRAPPED) { + let _d = write!(f, "IDS_WRAPPED "); + } + if self.contains(StreamFlags::HISTORY_DONE) { + let _d = write!(f, "HISTORY_DONE "); + } + if self.contains(StreamFlags::ROOT_CHANGED) { + let _d = write!(f, "ROOT_CHANGED "); + } + if self.contains(StreamFlags::MOUNT) { + let _d = write!(f, "MOUNT "); + } + if self.contains(StreamFlags::UNMOUNT) { + let _d = write!(f, "UNMOUNT "); + } + if self.contains(StreamFlags::ITEM_CREATED) { + let _d = write!(f, "ITEM_CREATED "); + } + if self.contains(StreamFlags::ITEM_REMOVED) { + let _d = write!(f, "ITEM_REMOVED "); + } + if self.contains(StreamFlags::INODE_META_MOD) { + let _d = write!(f, "INODE_META_MOD "); + } + if self.contains(StreamFlags::ITEM_RENAMED) { + let _d = write!(f, "ITEM_RENAMED "); + } + if self.contains(StreamFlags::ITEM_MODIFIED) { + let _d = write!(f, "ITEM_MODIFIED "); + } + if self.contains(StreamFlags::FINDER_INFO_MOD) { + let _d = write!(f, "FINDER_INFO_MOD "); + } + if self.contains(StreamFlags::ITEM_CHANGE_OWNER) { + let _d = write!(f, "ITEM_CHANGE_OWNER "); + } + if self.contains(StreamFlags::ITEM_XATTR_MOD) { + let _d = write!(f, "ITEM_XATTR_MOD "); + } + if self.contains(StreamFlags::IS_FILE) { + let _d = write!(f, "IS_FILE "); + } + if self.contains(StreamFlags::IS_DIR) { + let _d = write!(f, "IS_DIR "); + } + if self.contains(StreamFlags::IS_SYMLINK) { + let _d = write!(f, "IS_SYMLINK "); + } + if self.contains(StreamFlags::OWN_EVENT) { + let _d = write!(f, "OWN_EVENT "); + } + if self.contains(StreamFlags::IS_LAST_HARDLINK) { + let _d = write!(f, "IS_LAST_HARDLINK "); + } + if self.contains(StreamFlags::IS_HARDLINK) { + let _d = write!(f, "IS_HARDLINK "); + } + if self.contains(StreamFlags::ITEM_CLONED) { + let _d = write!(f, "ITEM_CLONED "); + } + write!(f, "") + } +} + +#[link(name = "CoreServices", kind = "framework")] +unsafe extern "C" { + pub fn FSEventsGetCurrentEventId() -> u64; +} + +// These tests are disabled by default because they seem to be unresolvably flaky. +// Feel free to bring them back to help test this code +#[cfg(false)] +mod tests { + use super::*; + use std::{fs, sync::mpsc, thread, time::Duration}; + + #[test] + fn test_event_stream_simple() { + for _ in 0..3 { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(timeout()).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let mut events = rx.recv_timeout(timeout()).unwrap(); + let mut event = events.last().unwrap(); + // we see this duplicate about 1/100 test runs. + if event.path == path.join("new-file") + && event.flags.contains(StreamFlags::ITEM_CREATED) + { + events = rx.recv_timeout(timeout()).unwrap(); + event = events.last().unwrap(); + } + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_delayed_start() { + for _ in 0..3 { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + + // Delay the call to `run` in order to make sure we don't miss any events that occur + // between creating the `EventStream` and calling `run`. + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + stream.run(move |events| tx.send(events.to_vec()).is_ok()) + }); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(timeout()).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(timeout()).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_shutdown_by_dropping_handle() { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || { + stream.run({ + let tx = tx.clone(); + move |_| { + tx.send("running").unwrap(); + true + } + }); + tx.send("stopped").unwrap(); + }); + + fs::write(path.join("new-file"), "").unwrap(); + assert_eq!(rx.recv_timeout(timeout()).unwrap(), "running"); + + // Dropping the handle causes `EventStream::run` to return. + drop(handle); + assert_eq!(rx.recv_timeout(timeout()).unwrap(), "stopped"); + } + + #[test] + fn test_event_stream_shutdown_before_run() { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + drop(handle); + + // This returns immediately because the handle was already dropped. + stream.run(|_| true); + } + + fn flush_historical_events() { + thread::sleep(timeout()); + } + + fn timeout() -> Duration { + if std::env::var("CI").is_ok() { + Duration::from_secs(4) + } else { + Duration::from_millis(500) + } + } +}