From b1f9ca96ec263352024edbf515a784b657a6bc3d Mon Sep 17 00:00:00 2001 From: Marco Mihai Condrache <52580954+marcocondrache@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:12:41 +0100 Subject: [PATCH] fs: Replace MacWatcher with notify::FsEventWatcher (#47322) Closes #47064 Closes #20806 The previous implementation used Zed's custom fsevent crate, which spawned a dedicated thread for each watched path. On large projects, this could result in 100+ threads just for filesystem watching. This PR removes the `fsevent` crate and switches to using `notify's` FsEventWatcher directly. The notify implementation maintains a single FSEvents stream that watches all paths on one thread. When paths are added or removed, the stream is stopped, the path list is updated, and the stream is restarted. Ref: https://github.com/notify-rs/notify/blob/main/notify/src/fsevent.rs As a result, Zed now uses one thread for filesystem watching regardless of how many paths are watched. Release Notes: - On macOS, Zed now uses significantly fewer resources when watching filesystem changes --------- Signed-off-by: Marco Mihai Condrache <52580954+marcocondrache@users.noreply.github.com> Co-authored-by: Anthony Eid --- Cargo.lock | 26 +- Cargo.toml | 3 - crates/fs/Cargo.toml | 5 +- crates/fs/src/fs.rs | 60 ---- crates/fs/src/fs_watcher.rs | 25 +- crates/fs/src/mac_watcher.rs | 77 ----- crates/fsevent/Cargo.toml | 28 -- crates/fsevent/LICENSE-GPL | 1 - crates/fsevent/examples/events.rs | 23 -- crates/fsevent/src/fsevent.rs | 515 ------------------------------ 10 files changed, 24 insertions(+), 739 deletions(-) delete mode 100644 crates/fs/src/mac_watcher.rs delete mode 100644 crates/fsevent/Cargo.toml delete mode 120000 crates/fsevent/LICENSE-GPL delete mode 100644 crates/fsevent/examples/events.rs delete mode 100644 crates/fsevent/src/fsevent.rs diff --git a/Cargo.lock b/Cargo.lock index 31b1a06c4372d8af4ef0a103b24f83d40afac63b..c355df36f8cab19b9750b04a817b0bc379cdff1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6518,7 +6518,6 @@ dependencies = [ "cocoa 0.26.0", "collections", "fs", - "fsevent", "futures 0.3.31", "git", "gpui", @@ -6577,27 +6576,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" -[[package]] -name = "fsevent" -version = "0.1.0" -dependencies = [ - "bitflags 2.9.4", - "core-foundation 0.10.0", - "fsevent-sys 3.1.0", - "log", - "parking_lot", - "tempfile", -] - -[[package]] -name = "fsevent-sys" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6f5e6817058771c10f0eb0f05ddf1e35844266f972004fe8e4b21fda295bd5" -dependencies = [ - "libc", -] - [[package]] name = "fsevent-sys" version = "4.1.0" @@ -10592,7 +10570,7 @@ dependencies = [ "bitflags 2.9.4", "crossbeam-channel", "filetime", - "fsevent-sys 4.1.0", + "fsevent-sys", "inotify 0.9.6", "kqueue", "libc", @@ -10608,7 +10586,7 @@ version = "8.2.0" source = "git+https://github.com/zed-industries/notify.git?rev=6c550ac3c56cbd143c57ea6390e197af9d790908#6c550ac3c56cbd143c57ea6390e197af9d790908" dependencies = [ "bitflags 2.9.4", - "fsevent-sys 4.1.0", + "fsevent-sys", "inotify 0.11.0", "kqueue", "libc", diff --git a/Cargo.toml b/Cargo.toml index 8e7ea3d734b25b00821c60d90ea165294ac8f2af..d34319f5ed1b38738dd071258c90328916fe4f43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,6 @@ members = [ "crates/file_icons", "crates/fs", "crates/fs_benchmarks", - "crates/fsevent", "crates/fuzzy", "crates/git", "crates/git_graph", @@ -313,7 +312,6 @@ feedback = { path = "crates/feedback" } file_finder = { path = "crates/file_finder" } file_icons = { path = "crates/file_icons" } fs = { path = "crates/fs" } -fsevent = { path = "crates/fsevent" } fuzzy = { path = "crates/fuzzy" } git = { path = "crates/git" } git_graph = { path = "crates/git_graph" } @@ -834,7 +832,6 @@ command_palette = { codegen-units = 1 } command_palette_hooks = { codegen-units = 1 } feature_flags = { codegen-units = 1 } file_icons = { codegen-units = 1 } -fsevent = { codegen-units = 1 } image_viewer = { codegen-units = 1 } edit_prediction_ui = { codegen-units = 1 } install_cli = { codegen-units = 1 } diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml index c38a4584109edf422d754115399566f987cb8ea6..6355524e4f328df0ca7fcf24c1df0557676ba6a6 100644 --- a/crates/fs/Cargo.toml +++ b/crates/fs/Cargo.toml @@ -40,15 +40,12 @@ text.workspace = true time.workspace = true util.workspace = true is_executable = "1.0.5" +notify = "8.2.0" [target.'cfg(target_os = "macos")'.dependencies] -fsevent.workspace = true objc.workspace = true cocoa = "0.26" -[target.'cfg(not(target_os = "macos"))'.dependencies] -notify = "8.2.0" - [target.'cfg(target_os = "windows")'.dependencies] windows.workspace = true diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index f8eed421b582d462f0a69436aba1533e749a9cf0..312cf28c21161c60289c8f41d0db0904e869455f 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -1,7 +1,3 @@ -#[cfg(target_os = "macos")] -mod mac_watcher; - -#[cfg(not(target_os = "macos"))] pub mod fs_watcher; use parking_lot::Mutex; @@ -976,62 +972,6 @@ impl Fs for RealFs { Ok(Box::pin(result)) } - #[cfg(target_os = "macos")] - async fn watch( - &self, - path: &Path, - latency: Duration, - ) -> ( - Pin>>>, - Arc, - ) { - use fsevent::StreamFlags; - - let (events_tx, events_rx) = smol::channel::unbounded(); - let handles = Arc::new(parking_lot::Mutex::new(collections::BTreeMap::default())); - let watcher = Arc::new(mac_watcher::MacWatcher::new( - events_tx, - Arc::downgrade(&handles), - latency, - )); - watcher.add(path).expect("handles can't be dropped"); - - ( - Box::pin( - events_rx - .map(|events| { - events - .into_iter() - .map(|event| { - log::trace!("fs path event: {event:?}"); - let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) { - Some(PathEventKind::Removed) - } else if event.flags.contains(StreamFlags::ITEM_CREATED) { - Some(PathEventKind::Created) - } else if event.flags.contains(StreamFlags::ITEM_MODIFIED) - | event.flags.contains(StreamFlags::ITEM_RENAMED) - { - Some(PathEventKind::Changed) - } else { - None - }; - PathEvent { - path: event.path, - kind, - } - }) - .collect() - }) - .chain(futures::stream::once(async move { - drop(handles); - vec![] - })), - ), - watcher, - ) - } - - #[cfg(not(target_os = "macos"))] async fn watch( &self, path: &Path, diff --git a/crates/fs/src/fs_watcher.rs b/crates/fs/src/fs_watcher.rs index 18d5dbeeb9e82948aaa503e7268d39c5d1852a2b..efb381c9a5480df598d774dd17e9c49f8ef82f92 100644 --- a/crates/fs/src/fs_watcher.rs +++ b/crates/fs/src/fs_watcher.rs @@ -50,7 +50,7 @@ impl Watcher for FsWatcher { let tx = self.tx.clone(); let pending_paths = self.pending_path_events.clone(); - #[cfg(target_os = "windows")] + #[cfg(any(target_os = "windows", target_os = "macos"))] { // Return early if an ancestor of this path was already being watched. // saves a huge amount of memory @@ -81,7 +81,7 @@ impl Watcher for FsWatcher { let root_path = SanitizedPath::new_arc(path); let path: Arc = path.into(); - #[cfg(target_os = "windows")] + #[cfg(any(target_os = "windows", target_os = "macos"))] let mode = notify::RecursiveMode::Recursive; #[cfg(target_os = "linux")] let mode = notify::RecursiveMode::NonRecursive; @@ -166,6 +166,8 @@ pub struct GlobalWatcher { watcher: Mutex, #[cfg(target_os = "windows")] watcher: Mutex, + #[cfg(target_os = "macos")] + watcher: Mutex, } impl GlobalWatcher { @@ -178,10 +180,25 @@ impl GlobalWatcher { ) -> anyhow::Result { use notify::Watcher; - self.watcher.lock().watch(&path, mode)?; - let mut state = self.state.lock(); + // Check if this path is already covered by an existing watched ancestor path. + // On macOS and Windows, watching is recursive, so we don't need to watch + // child paths if an ancestor is already being watched. + #[cfg(any(target_os = "windows", target_os = "macos"))] + let path_already_covered = state.path_registrations.keys().any(|existing| { + path.starts_with(existing.as_ref()) && path.as_ref() != existing.as_ref() + }); + + #[cfg(not(any(target_os = "windows", target_os = "macos")))] + let path_already_covered = false; + + if !path_already_covered && !state.path_registrations.contains_key(&path) { + drop(state); + self.watcher.lock().watch(&path, mode)?; + state = self.state.lock(); + } + let id = state.last_registration; state.last_registration = WatcherRegistrationId(id.0 + 1); diff --git a/crates/fs/src/mac_watcher.rs b/crates/fs/src/mac_watcher.rs deleted file mode 100644 index b781a231ba2bc33a895480ea278a7ccfe3364fe7..0000000000000000000000000000000000000000 --- a/crates/fs/src/mac_watcher.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::Watcher; -use anyhow::{Context as _, Result}; -use collections::{BTreeMap, Bound}; -use fsevent::EventStream; -use parking_lot::Mutex; -use std::{ - path::{Path, PathBuf}, - sync::Weak, - thread, - time::Duration, -}; - -pub struct MacWatcher { - events_tx: smol::channel::Sender>, - handles: Weak>>, - latency: Duration, -} - -impl MacWatcher { - pub fn new( - events_tx: smol::channel::Sender>, - handles: Weak>>, - latency: Duration, - ) -> Self { - Self { - events_tx, - handles, - latency, - } - } -} - -impl Watcher for MacWatcher { - fn add(&self, path: &Path) -> Result<()> { - log::trace!("mac watcher add: {:?}", path); - let handles = self - .handles - .upgrade() - .context("unable to watch path, receiver dropped")?; - let mut handles = handles.lock(); - - // Return early if an ancestor of this path was already being watched. - if let Some((watched_path, _)) = handles - .range::((Bound::Unbounded, Bound::Included(path))) - .next_back() - && path.starts_with(watched_path) - { - log::trace!( - "mac watched path starts with existing watched path: {watched_path:?}, {path:?}" - ); - return Ok(()); - } - - let (stream, handle) = EventStream::new(&[path], self.latency); - let tx = self.events_tx.clone(); - thread::Builder::new() - .name("MacWatcher".to_owned()) - .spawn(move || { - stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); - }) - .unwrap(); - handles.insert(path.into(), handle); - - Ok(()) - } - - fn remove(&self, path: &Path) -> anyhow::Result<()> { - let handles = self - .handles - .upgrade() - .context("unable to remove path, receiver dropped")?; - - let mut handles = handles.lock(); - handles.remove(path); - Ok(()) - } -} diff --git a/crates/fsevent/Cargo.toml b/crates/fsevent/Cargo.toml deleted file mode 100644 index 635b36ebe14ee6823f8773bb38ff085516e320b9..0000000000000000000000000000000000000000 --- a/crates/fsevent/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "fsevent" -version = "0.1.0" -edition.workspace = true -publish.workspace = true -license = "GPL-3.0-or-later" - -[lints] -workspace = true - -[lib] -path = "src/fsevent.rs" -doctest = false - -[dependencies] -bitflags.workspace = true -parking_lot.workspace = true -log.workspace = true - -[target.'cfg(target_os = "macos")'.dependencies] -core-foundation.workspace = true -fsevent-sys = "3.0.2" - -[dev-dependencies] -tempfile.workspace = true - -[package.metadata.docs.rs] -targets = ["x86_64-apple-darwin"] diff --git a/crates/fsevent/LICENSE-GPL b/crates/fsevent/LICENSE-GPL deleted file mode 120000 index 89e542f750cd3860a0598eff0dc34b56d7336dc4..0000000000000000000000000000000000000000 --- a/crates/fsevent/LICENSE-GPL +++ /dev/null @@ -1 +0,0 @@ -../../LICENSE-GPL \ No newline at end of file diff --git a/crates/fsevent/examples/events.rs b/crates/fsevent/examples/events.rs deleted file mode 100644 index bae9e734259992e5cf3e553dda2622aeec5eaa33..0000000000000000000000000000000000000000 --- a/crates/fsevent/examples/events.rs +++ /dev/null @@ -1,23 +0,0 @@ -#[cfg(target_os = "macos")] -fn main() { - use fsevent::EventStream; - use std::{env::args, path::Path, time::Duration}; - - let paths = args().skip(1).collect::>(); - let paths = paths.iter().map(Path::new).collect::>(); - assert!(!paths.is_empty(), "Must pass 1 or more paths as arguments"); - - let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100)); - stream.run(|events| { - eprintln!("event batch"); - for event in events { - eprintln!(" {:?}", event); - } - true - }); -} - -#[cfg(not(target_os = "macos"))] -fn main() { - eprintln!("This example only works on macOS"); -} diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs deleted file mode 100644 index 0fe3592010fa0e7e12b68c8dfc919807e91c8e64..0000000000000000000000000000000000000000 --- a/crates/fsevent/src/fsevent.rs +++ /dev/null @@ -1,515 +0,0 @@ -#![cfg(target_os = "macos")] - -use bitflags::bitflags; -use fsevent_sys::{self as fs, core_foundation as cf}; -use parking_lot::Mutex; -use std::{ - convert::AsRef, - ffi::{CStr, OsStr, c_void}, - os::unix::ffi::OsStrExt, - path::{Path, PathBuf}, - ptr, slice, - sync::Arc, - time::Duration, -}; - -#[derive(Clone, Debug)] -pub struct Event { - pub event_id: u64, - pub flags: StreamFlags, - pub path: PathBuf, -} - -pub struct EventStream { - lifecycle: Arc>, - state: Box, -} - -struct State { - latency: Duration, - paths: cf::CFMutableArrayRef, - callback: Option) -> bool>>, - last_valid_event_id: Option, - stream: fs::FSEventStreamRef, -} - -impl Drop for State { - fn drop(&mut self) { - unsafe { - cf::CFRelease(self.paths); - fs::FSEventStreamStop(self.stream); - fs::FSEventStreamInvalidate(self.stream); - fs::FSEventStreamRelease(self.stream); - } - } -} - -enum Lifecycle { - New, - Running(cf::CFRunLoopRef), - Stopped, -} - -pub struct Handle(Arc>); - -unsafe impl Send for EventStream {} -unsafe impl Send for Lifecycle {} - -impl EventStream { - pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { - unsafe { - let cf_paths = - cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); - assert!(!cf_paths.is_null()); - - for path in paths { - let path_bytes = path.as_os_str().as_bytes(); - let cf_url = cf::CFURLCreateFromFileSystemRepresentation( - cf::kCFAllocatorDefault, - path_bytes.as_ptr() as *const i8, - path_bytes.len() as cf::CFIndex, - false, - ); - if !cf_url.is_null() { - let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); - cf::CFArrayAppendValue(cf_paths, cf_path); - cf::CFRelease(cf_path); - cf::CFRelease(cf_url); - } else { - log::error!("Failed to create CFURL for path: {path:?}"); - } - } - - let mut state = Box::new(State { - latency, - paths: cf_paths, - callback: None, - last_valid_event_id: None, - stream: ptr::null_mut(), - }); - let stream_context = fs::FSEventStreamContext { - version: 0, - info: state.as_ref() as *const _ as *mut c_void, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - cf_paths, - FSEventsGetCurrentEventId(), - latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - state.stream = stream; - - let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); - ( - EventStream { - lifecycle: lifecycle.clone(), - state, - }, - Handle(lifecycle), - ) - } - } - - pub fn run(mut self, f: F) - where - F: FnMut(Vec) -> bool + 'static, - { - self.state.callback = Some(Box::new(f)); - unsafe { - let run_loop = - core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void; - { - let mut state = self.lifecycle.lock(); - match *state { - Lifecycle::New => *state = Lifecycle::Running(run_loop), - Lifecycle::Running(_) => unreachable!(), - Lifecycle::Stopped => return, - } - } - fs::FSEventStreamScheduleWithRunLoop( - self.state.stream, - run_loop, - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(self.state.stream); - cf::CFRunLoopRun(); - } - } - - extern "C" fn trampoline( - stream_ref: fs::FSEventStreamRef, - info: *mut ::std::os::raw::c_void, - num: usize, // size_t numEvents - event_paths: *mut ::std::os::raw::c_void, // void *eventPaths - event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] - event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] - ) { - unsafe { - let event_paths = event_paths as *const *const ::std::os::raw::c_char; - let e_ptr = event_flags as *mut u32; - let i_ptr = event_ids as *mut u64; - let state = (info as *mut State).as_mut().unwrap(); - let callback = if let Some(callback) = state.callback.as_mut() { - callback - } else { - return; - }; - - let paths = slice::from_raw_parts(event_paths, num); - let flags = slice::from_raw_parts_mut(e_ptr, num); - let ids = slice::from_raw_parts_mut(i_ptr, num); - let mut stream_restarted = false; - - // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel - // or our code couldn't keep up with the sheer volume of file-system events that were - // generated. If we observed a valid event before this happens, we'll try to read the - // file-system journal by stopping the current stream and creating a new one starting at - // such event. Otherwise, we'll let invoke the callback with the dropped event, which - // will likely perform a re-scan of one of the root directories. - if flags - .iter() - .copied() - .filter_map(StreamFlags::from_bits) - .any(|flags| { - flags.contains(StreamFlags::USER_DROPPED) - || flags.contains(StreamFlags::KERNEL_DROPPED) - }) - && let Some(last_valid_event_id) = state.last_valid_event_id.take() - { - fs::FSEventStreamStop(state.stream); - fs::FSEventStreamInvalidate(state.stream); - fs::FSEventStreamRelease(state.stream); - - let stream_context = fs::FSEventStreamContext { - version: 0, - info, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - state.paths, - last_valid_event_id, - state.latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - - state.stream = stream; - fs::FSEventStreamScheduleWithRunLoop( - state.stream, - cf::CFRunLoopGetCurrent(), - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(state.stream); - stream_restarted = true; - } - - if !stream_restarted { - let mut events = Vec::with_capacity(num); - for p in 0..num { - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if !flag.contains(StreamFlags::HISTORY_DONE) { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - let event = Event { - event_id: ids[p], - flags: flag, - path, - }; - state.last_valid_event_id = Some(event.event_id); - events.push(event); - } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); - } - } - - if !events.is_empty() && !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); - } - } - } - } -} - -impl Drop for Handle { - fn drop(&mut self) { - let mut state = self.0.lock(); - if let Lifecycle::Running(run_loop) = *state { - unsafe { - cf::CFRunLoopStop(run_loop); - cf::CFRelease(run_loop) - } - } - *state = Lifecycle::Stopped; - } -} - -// Synchronize with -// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h -bitflags! { - #[derive(Debug, PartialEq, Eq, Clone, Copy)] - #[repr(C)] - pub struct StreamFlags: u32 { - const NONE = 0x00000000; - const MUST_SCAN_SUBDIRS = 0x00000001; - const USER_DROPPED = 0x00000002; - const KERNEL_DROPPED = 0x00000004; - const IDS_WRAPPED = 0x00000008; - const HISTORY_DONE = 0x00000010; - const ROOT_CHANGED = 0x00000020; - const MOUNT = 0x00000040; - const UNMOUNT = 0x00000080; - const ITEM_CREATED = 0x00000100; - const ITEM_REMOVED = 0x00000200; - const INODE_META_MOD = 0x00000400; - const ITEM_RENAMED = 0x00000800; - const ITEM_MODIFIED = 0x00001000; - const FINDER_INFO_MOD = 0x00002000; - const ITEM_CHANGE_OWNER = 0x00004000; - const ITEM_XATTR_MOD = 0x00008000; - const IS_FILE = 0x00010000; - const IS_DIR = 0x00020000; - const IS_SYMLINK = 0x00040000; - const OWN_EVENT = 0x00080000; - const IS_HARDLINK = 0x00100000; - const IS_LAST_HARDLINK = 0x00200000; - const ITEM_CLONED = 0x400000; - } -} - -impl std::fmt::Display for StreamFlags { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) { - let _d = write!(f, "MUST_SCAN_SUBDIRS "); - } - if self.contains(StreamFlags::USER_DROPPED) { - let _d = write!(f, "USER_DROPPED "); - } - if self.contains(StreamFlags::KERNEL_DROPPED) { - let _d = write!(f, "KERNEL_DROPPED "); - } - if self.contains(StreamFlags::IDS_WRAPPED) { - let _d = write!(f, "IDS_WRAPPED "); - } - if self.contains(StreamFlags::HISTORY_DONE) { - let _d = write!(f, "HISTORY_DONE "); - } - if self.contains(StreamFlags::ROOT_CHANGED) { - let _d = write!(f, "ROOT_CHANGED "); - } - if self.contains(StreamFlags::MOUNT) { - let _d = write!(f, "MOUNT "); - } - if self.contains(StreamFlags::UNMOUNT) { - let _d = write!(f, "UNMOUNT "); - } - if self.contains(StreamFlags::ITEM_CREATED) { - let _d = write!(f, "ITEM_CREATED "); - } - if self.contains(StreamFlags::ITEM_REMOVED) { - let _d = write!(f, "ITEM_REMOVED "); - } - if self.contains(StreamFlags::INODE_META_MOD) { - let _d = write!(f, "INODE_META_MOD "); - } - if self.contains(StreamFlags::ITEM_RENAMED) { - let _d = write!(f, "ITEM_RENAMED "); - } - if self.contains(StreamFlags::ITEM_MODIFIED) { - let _d = write!(f, "ITEM_MODIFIED "); - } - if self.contains(StreamFlags::FINDER_INFO_MOD) { - let _d = write!(f, "FINDER_INFO_MOD "); - } - if self.contains(StreamFlags::ITEM_CHANGE_OWNER) { - let _d = write!(f, "ITEM_CHANGE_OWNER "); - } - if self.contains(StreamFlags::ITEM_XATTR_MOD) { - let _d = write!(f, "ITEM_XATTR_MOD "); - } - if self.contains(StreamFlags::IS_FILE) { - let _d = write!(f, "IS_FILE "); - } - if self.contains(StreamFlags::IS_DIR) { - let _d = write!(f, "IS_DIR "); - } - if self.contains(StreamFlags::IS_SYMLINK) { - let _d = write!(f, "IS_SYMLINK "); - } - if self.contains(StreamFlags::OWN_EVENT) { - let _d = write!(f, "OWN_EVENT "); - } - if self.contains(StreamFlags::IS_LAST_HARDLINK) { - let _d = write!(f, "IS_LAST_HARDLINK "); - } - if self.contains(StreamFlags::IS_HARDLINK) { - let _d = write!(f, "IS_HARDLINK "); - } - if self.contains(StreamFlags::ITEM_CLONED) { - let _d = write!(f, "ITEM_CLONED "); - } - write!(f, "") - } -} - -#[link(name = "CoreServices", kind = "framework")] -unsafe extern "C" { - pub fn FSEventsGetCurrentEventId() -> u64; -} - -// These tests are disabled by default because they seem to be unresolvably flaky. -// Feel free to bring them back to help test this code -#[cfg(false)] -mod tests { - use super::*; - use std::{fs, sync::mpsc, thread, time::Duration}; - - #[test] - fn test_event_stream_simple() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(timeout()).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let mut events = rx.recv_timeout(timeout()).unwrap(); - let mut event = events.last().unwrap(); - // we see this duplicate about 1/100 test runs. - if event.path == path.join("new-file") - && event.flags.contains(StreamFlags::ITEM_CREATED) - { - events = rx.recv_timeout(timeout()).unwrap(); - event = events.last().unwrap(); - } - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_delayed_start() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - - // Delay the call to `run` in order to make sure we don't miss any events that occur - // between creating the `EventStream` and calling `run`. - thread::spawn(move || { - thread::sleep(Duration::from_millis(100)); - stream.run(move |events| tx.send(events.to_vec()).is_ok()) - }); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(timeout()).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let events = rx.recv_timeout(timeout()).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_shutdown_by_dropping_handle() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || { - stream.run({ - let tx = tx.clone(); - move |_| { - tx.send("running").unwrap(); - true - } - }); - tx.send("stopped").unwrap(); - }); - - fs::write(path.join("new-file"), "").unwrap(); - assert_eq!(rx.recv_timeout(timeout()).unwrap(), "running"); - - // Dropping the handle causes `EventStream::run` to return. - drop(handle); - assert_eq!(rx.recv_timeout(timeout()).unwrap(), "stopped"); - } - - #[test] - fn test_event_stream_shutdown_before_run() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - drop(handle); - - // This returns immediately because the handle was already dropped. - stream.run(|_| true); - } - - fn flush_historical_events() { - thread::sleep(timeout()); - } - - fn timeout() -> Duration { - if std::env::var("CI").is_ok() { - Duration::from_secs(4) - } else { - Duration::from_millis(500) - } - } -}