diff --git a/Cargo.lock b/Cargo.lock index 31b1a06c4372d8af4ef0a103b24f83d40afac63b..c355df36f8cab19b9750b04a817b0bc379cdff1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6518,7 +6518,6 @@ dependencies = [ "cocoa 0.26.0", "collections", "fs", - "fsevent", "futures 0.3.31", "git", "gpui", @@ -6577,27 +6576,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" -[[package]] -name = "fsevent" -version = "0.1.0" -dependencies = [ - "bitflags 2.9.4", - "core-foundation 0.10.0", - "fsevent-sys 3.1.0", - "log", - "parking_lot", - "tempfile", -] - -[[package]] -name = "fsevent-sys" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6f5e6817058771c10f0eb0f05ddf1e35844266f972004fe8e4b21fda295bd5" -dependencies = [ - "libc", -] - [[package]] name = "fsevent-sys" version = "4.1.0" @@ -10592,7 +10570,7 @@ dependencies = [ "bitflags 2.9.4", "crossbeam-channel", "filetime", - "fsevent-sys 4.1.0", + "fsevent-sys", "inotify 0.9.6", "kqueue", "libc", @@ -10608,7 +10586,7 @@ version = "8.2.0" source = "git+https://github.com/zed-industries/notify.git?rev=6c550ac3c56cbd143c57ea6390e197af9d790908#6c550ac3c56cbd143c57ea6390e197af9d790908" dependencies = [ "bitflags 2.9.4", - "fsevent-sys 4.1.0", + "fsevent-sys", "inotify 0.11.0", "kqueue", "libc", diff --git a/Cargo.toml b/Cargo.toml index 8e7ea3d734b25b00821c60d90ea165294ac8f2af..d34319f5ed1b38738dd071258c90328916fe4f43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,6 @@ members = [ "crates/file_icons", "crates/fs", "crates/fs_benchmarks", - "crates/fsevent", "crates/fuzzy", "crates/git", "crates/git_graph", @@ -313,7 +312,6 @@ feedback = { path = "crates/feedback" } file_finder = { path = "crates/file_finder" } file_icons = { path = "crates/file_icons" } fs = { path = "crates/fs" } -fsevent = { path = "crates/fsevent" } fuzzy = { path = "crates/fuzzy" } git = { path = "crates/git" } git_graph = { path = "crates/git_graph" } @@ -834,7 +832,6 @@ command_palette = { codegen-units = 1 } command_palette_hooks = { codegen-units = 1 } feature_flags = { codegen-units = 1 } file_icons = { codegen-units = 1 } -fsevent = { codegen-units = 1 } image_viewer = { codegen-units = 1 } edit_prediction_ui = { codegen-units = 1 } install_cli = { codegen-units = 1 } diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml index c38a4584109edf422d754115399566f987cb8ea6..6355524e4f328df0ca7fcf24c1df0557676ba6a6 100644 --- a/crates/fs/Cargo.toml +++ b/crates/fs/Cargo.toml @@ -40,15 +40,12 @@ text.workspace = true time.workspace = true util.workspace = true is_executable = "1.0.5" +notify = "8.2.0" [target.'cfg(target_os = "macos")'.dependencies] -fsevent.workspace = true objc.workspace = true cocoa = "0.26" -[target.'cfg(not(target_os = "macos"))'.dependencies] -notify = "8.2.0" - [target.'cfg(target_os = "windows")'.dependencies] windows.workspace = true diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index f8eed421b582d462f0a69436aba1533e749a9cf0..312cf28c21161c60289c8f41d0db0904e869455f 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -1,7 +1,3 @@ -#[cfg(target_os = "macos")] -mod mac_watcher; - -#[cfg(not(target_os = "macos"))] pub mod fs_watcher; use parking_lot::Mutex; @@ -976,62 +972,6 @@ impl Fs for RealFs { Ok(Box::pin(result)) } - #[cfg(target_os = "macos")] - async fn watch( - &self, - path: &Path, - latency: Duration, - ) -> ( - Pin>>>, - Arc, - ) { - use fsevent::StreamFlags; - - let (events_tx, events_rx) = smol::channel::unbounded(); - let handles = Arc::new(parking_lot::Mutex::new(collections::BTreeMap::default())); - let watcher = Arc::new(mac_watcher::MacWatcher::new( - events_tx, - Arc::downgrade(&handles), - latency, - )); - watcher.add(path).expect("handles can't be dropped"); - - ( - Box::pin( - events_rx - .map(|events| { - events - .into_iter() - .map(|event| { - log::trace!("fs path event: {event:?}"); - let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) { - Some(PathEventKind::Removed) - } else if event.flags.contains(StreamFlags::ITEM_CREATED) { - Some(PathEventKind::Created) - } else if event.flags.contains(StreamFlags::ITEM_MODIFIED) - | event.flags.contains(StreamFlags::ITEM_RENAMED) - { - Some(PathEventKind::Changed) - } else { - None - }; - PathEvent { - path: event.path, - kind, - } - }) - .collect() - }) - .chain(futures::stream::once(async move { - drop(handles); - vec![] - })), - ), - watcher, - ) - } - - #[cfg(not(target_os = "macos"))] async fn watch( &self, path: &Path, diff --git a/crates/fs/src/fs_watcher.rs b/crates/fs/src/fs_watcher.rs index 18d5dbeeb9e82948aaa503e7268d39c5d1852a2b..efb381c9a5480df598d774dd17e9c49f8ef82f92 100644 --- a/crates/fs/src/fs_watcher.rs +++ b/crates/fs/src/fs_watcher.rs @@ -50,7 +50,7 @@ impl Watcher for FsWatcher { let tx = self.tx.clone(); let pending_paths = self.pending_path_events.clone(); - #[cfg(target_os = "windows")] + #[cfg(any(target_os = "windows", target_os = "macos"))] { // Return early if an ancestor of this path was already being watched. // saves a huge amount of memory @@ -81,7 +81,7 @@ impl Watcher for FsWatcher { let root_path = SanitizedPath::new_arc(path); let path: Arc = path.into(); - #[cfg(target_os = "windows")] + #[cfg(any(target_os = "windows", target_os = "macos"))] let mode = notify::RecursiveMode::Recursive; #[cfg(target_os = "linux")] let mode = notify::RecursiveMode::NonRecursive; @@ -166,6 +166,8 @@ pub struct GlobalWatcher { watcher: Mutex, #[cfg(target_os = "windows")] watcher: Mutex, + #[cfg(target_os = "macos")] + watcher: Mutex, } impl GlobalWatcher { @@ -178,10 +180,25 @@ impl GlobalWatcher { ) -> anyhow::Result { use notify::Watcher; - self.watcher.lock().watch(&path, mode)?; - let mut state = self.state.lock(); + // Check if this path is already covered by an existing watched ancestor path. + // On macOS and Windows, watching is recursive, so we don't need to watch + // child paths if an ancestor is already being watched. + #[cfg(any(target_os = "windows", target_os = "macos"))] + let path_already_covered = state.path_registrations.keys().any(|existing| { + path.starts_with(existing.as_ref()) && path.as_ref() != existing.as_ref() + }); + + #[cfg(not(any(target_os = "windows", target_os = "macos")))] + let path_already_covered = false; + + if !path_already_covered && !state.path_registrations.contains_key(&path) { + drop(state); + self.watcher.lock().watch(&path, mode)?; + state = self.state.lock(); + } + let id = state.last_registration; state.last_registration = WatcherRegistrationId(id.0 + 1); diff --git a/crates/fs/src/mac_watcher.rs b/crates/fs/src/mac_watcher.rs deleted file mode 100644 index b781a231ba2bc33a895480ea278a7ccfe3364fe7..0000000000000000000000000000000000000000 --- a/crates/fs/src/mac_watcher.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::Watcher; -use anyhow::{Context as _, Result}; -use collections::{BTreeMap, Bound}; -use fsevent::EventStream; -use parking_lot::Mutex; -use std::{ - path::{Path, PathBuf}, - sync::Weak, - thread, - time::Duration, -}; - -pub struct MacWatcher { - events_tx: smol::channel::Sender>, - handles: Weak>>, - latency: Duration, -} - -impl MacWatcher { - pub fn new( - events_tx: smol::channel::Sender>, - handles: Weak>>, - latency: Duration, - ) -> Self { - Self { - events_tx, - handles, - latency, - } - } -} - -impl Watcher for MacWatcher { - fn add(&self, path: &Path) -> Result<()> { - log::trace!("mac watcher add: {:?}", path); - let handles = self - .handles - .upgrade() - .context("unable to watch path, receiver dropped")?; - let mut handles = handles.lock(); - - // Return early if an ancestor of this path was already being watched. - if let Some((watched_path, _)) = handles - .range::((Bound::Unbounded, Bound::Included(path))) - .next_back() - && path.starts_with(watched_path) - { - log::trace!( - "mac watched path starts with existing watched path: {watched_path:?}, {path:?}" - ); - return Ok(()); - } - - let (stream, handle) = EventStream::new(&[path], self.latency); - let tx = self.events_tx.clone(); - thread::Builder::new() - .name("MacWatcher".to_owned()) - .spawn(move || { - stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); - }) - .unwrap(); - handles.insert(path.into(), handle); - - Ok(()) - } - - fn remove(&self, path: &Path) -> anyhow::Result<()> { - let handles = self - .handles - .upgrade() - .context("unable to remove path, receiver dropped")?; - - let mut handles = handles.lock(); - handles.remove(path); - Ok(()) - } -} diff --git a/crates/fsevent/Cargo.toml b/crates/fsevent/Cargo.toml deleted file mode 100644 index 635b36ebe14ee6823f8773bb38ff085516e320b9..0000000000000000000000000000000000000000 --- a/crates/fsevent/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "fsevent" -version = "0.1.0" -edition.workspace = true -publish.workspace = true -license = "GPL-3.0-or-later" - -[lints] -workspace = true - -[lib] -path = "src/fsevent.rs" -doctest = false - -[dependencies] -bitflags.workspace = true -parking_lot.workspace = true -log.workspace = true - -[target.'cfg(target_os = "macos")'.dependencies] -core-foundation.workspace = true -fsevent-sys = "3.0.2" - -[dev-dependencies] -tempfile.workspace = true - -[package.metadata.docs.rs] -targets = ["x86_64-apple-darwin"] diff --git a/crates/fsevent/LICENSE-GPL b/crates/fsevent/LICENSE-GPL deleted file mode 120000 index 89e542f750cd3860a0598eff0dc34b56d7336dc4..0000000000000000000000000000000000000000 --- a/crates/fsevent/LICENSE-GPL +++ /dev/null @@ -1 +0,0 @@ -../../LICENSE-GPL \ No newline at end of file diff --git a/crates/fsevent/examples/events.rs b/crates/fsevent/examples/events.rs deleted file mode 100644 index bae9e734259992e5cf3e553dda2622aeec5eaa33..0000000000000000000000000000000000000000 --- a/crates/fsevent/examples/events.rs +++ /dev/null @@ -1,23 +0,0 @@ -#[cfg(target_os = "macos")] -fn main() { - use fsevent::EventStream; - use std::{env::args, path::Path, time::Duration}; - - let paths = args().skip(1).collect::>(); - let paths = paths.iter().map(Path::new).collect::>(); - assert!(!paths.is_empty(), "Must pass 1 or more paths as arguments"); - - let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100)); - stream.run(|events| { - eprintln!("event batch"); - for event in events { - eprintln!(" {:?}", event); - } - true - }); -} - -#[cfg(not(target_os = "macos"))] -fn main() { - eprintln!("This example only works on macOS"); -} diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs deleted file mode 100644 index 0fe3592010fa0e7e12b68c8dfc919807e91c8e64..0000000000000000000000000000000000000000 --- a/crates/fsevent/src/fsevent.rs +++ /dev/null @@ -1,515 +0,0 @@ -#![cfg(target_os = "macos")] - -use bitflags::bitflags; -use fsevent_sys::{self as fs, core_foundation as cf}; -use parking_lot::Mutex; -use std::{ - convert::AsRef, - ffi::{CStr, OsStr, c_void}, - os::unix::ffi::OsStrExt, - path::{Path, PathBuf}, - ptr, slice, - sync::Arc, - time::Duration, -}; - -#[derive(Clone, Debug)] -pub struct Event { - pub event_id: u64, - pub flags: StreamFlags, - pub path: PathBuf, -} - -pub struct EventStream { - lifecycle: Arc>, - state: Box, -} - -struct State { - latency: Duration, - paths: cf::CFMutableArrayRef, - callback: Option) -> bool>>, - last_valid_event_id: Option, - stream: fs::FSEventStreamRef, -} - -impl Drop for State { - fn drop(&mut self) { - unsafe { - cf::CFRelease(self.paths); - fs::FSEventStreamStop(self.stream); - fs::FSEventStreamInvalidate(self.stream); - fs::FSEventStreamRelease(self.stream); - } - } -} - -enum Lifecycle { - New, - Running(cf::CFRunLoopRef), - Stopped, -} - -pub struct Handle(Arc>); - -unsafe impl Send for EventStream {} -unsafe impl Send for Lifecycle {} - -impl EventStream { - pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { - unsafe { - let cf_paths = - cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); - assert!(!cf_paths.is_null()); - - for path in paths { - let path_bytes = path.as_os_str().as_bytes(); - let cf_url = cf::CFURLCreateFromFileSystemRepresentation( - cf::kCFAllocatorDefault, - path_bytes.as_ptr() as *const i8, - path_bytes.len() as cf::CFIndex, - false, - ); - if !cf_url.is_null() { - let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); - cf::CFArrayAppendValue(cf_paths, cf_path); - cf::CFRelease(cf_path); - cf::CFRelease(cf_url); - } else { - log::error!("Failed to create CFURL for path: {path:?}"); - } - } - - let mut state = Box::new(State { - latency, - paths: cf_paths, - callback: None, - last_valid_event_id: None, - stream: ptr::null_mut(), - }); - let stream_context = fs::FSEventStreamContext { - version: 0, - info: state.as_ref() as *const _ as *mut c_void, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - cf_paths, - FSEventsGetCurrentEventId(), - latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - state.stream = stream; - - let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); - ( - EventStream { - lifecycle: lifecycle.clone(), - state, - }, - Handle(lifecycle), - ) - } - } - - pub fn run(mut self, f: F) - where - F: FnMut(Vec) -> bool + 'static, - { - self.state.callback = Some(Box::new(f)); - unsafe { - let run_loop = - core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void; - { - let mut state = self.lifecycle.lock(); - match *state { - Lifecycle::New => *state = Lifecycle::Running(run_loop), - Lifecycle::Running(_) => unreachable!(), - Lifecycle::Stopped => return, - } - } - fs::FSEventStreamScheduleWithRunLoop( - self.state.stream, - run_loop, - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(self.state.stream); - cf::CFRunLoopRun(); - } - } - - extern "C" fn trampoline( - stream_ref: fs::FSEventStreamRef, - info: *mut ::std::os::raw::c_void, - num: usize, // size_t numEvents - event_paths: *mut ::std::os::raw::c_void, // void *eventPaths - event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] - event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] - ) { - unsafe { - let event_paths = event_paths as *const *const ::std::os::raw::c_char; - let e_ptr = event_flags as *mut u32; - let i_ptr = event_ids as *mut u64; - let state = (info as *mut State).as_mut().unwrap(); - let callback = if let Some(callback) = state.callback.as_mut() { - callback - } else { - return; - }; - - let paths = slice::from_raw_parts(event_paths, num); - let flags = slice::from_raw_parts_mut(e_ptr, num); - let ids = slice::from_raw_parts_mut(i_ptr, num); - let mut stream_restarted = false; - - // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel - // or our code couldn't keep up with the sheer volume of file-system events that were - // generated. If we observed a valid event before this happens, we'll try to read the - // file-system journal by stopping the current stream and creating a new one starting at - // such event. Otherwise, we'll let invoke the callback with the dropped event, which - // will likely perform a re-scan of one of the root directories. - if flags - .iter() - .copied() - .filter_map(StreamFlags::from_bits) - .any(|flags| { - flags.contains(StreamFlags::USER_DROPPED) - || flags.contains(StreamFlags::KERNEL_DROPPED) - }) - && let Some(last_valid_event_id) = state.last_valid_event_id.take() - { - fs::FSEventStreamStop(state.stream); - fs::FSEventStreamInvalidate(state.stream); - fs::FSEventStreamRelease(state.stream); - - let stream_context = fs::FSEventStreamContext { - version: 0, - info, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - state.paths, - last_valid_event_id, - state.latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - - state.stream = stream; - fs::FSEventStreamScheduleWithRunLoop( - state.stream, - cf::CFRunLoopGetCurrent(), - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(state.stream); - stream_restarted = true; - } - - if !stream_restarted { - let mut events = Vec::with_capacity(num); - for p in 0..num { - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if !flag.contains(StreamFlags::HISTORY_DONE) { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - let event = Event { - event_id: ids[p], - flags: flag, - path, - }; - state.last_valid_event_id = Some(event.event_id); - events.push(event); - } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); - } - } - - if !events.is_empty() && !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); - } - } - } - } -} - -impl Drop for Handle { - fn drop(&mut self) { - let mut state = self.0.lock(); - if let Lifecycle::Running(run_loop) = *state { - unsafe { - cf::CFRunLoopStop(run_loop); - cf::CFRelease(run_loop) - } - } - *state = Lifecycle::Stopped; - } -} - -// Synchronize with -// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h -bitflags! { - #[derive(Debug, PartialEq, Eq, Clone, Copy)] - #[repr(C)] - pub struct StreamFlags: u32 { - const NONE = 0x00000000; - const MUST_SCAN_SUBDIRS = 0x00000001; - const USER_DROPPED = 0x00000002; - const KERNEL_DROPPED = 0x00000004; - const IDS_WRAPPED = 0x00000008; - const HISTORY_DONE = 0x00000010; - const ROOT_CHANGED = 0x00000020; - const MOUNT = 0x00000040; - const UNMOUNT = 0x00000080; - const ITEM_CREATED = 0x00000100; - const ITEM_REMOVED = 0x00000200; - const INODE_META_MOD = 0x00000400; - const ITEM_RENAMED = 0x00000800; - const ITEM_MODIFIED = 0x00001000; - const FINDER_INFO_MOD = 0x00002000; - const ITEM_CHANGE_OWNER = 0x00004000; - const ITEM_XATTR_MOD = 0x00008000; - const IS_FILE = 0x00010000; - const IS_DIR = 0x00020000; - const IS_SYMLINK = 0x00040000; - const OWN_EVENT = 0x00080000; - const IS_HARDLINK = 0x00100000; - const IS_LAST_HARDLINK = 0x00200000; - const ITEM_CLONED = 0x400000; - } -} - -impl std::fmt::Display for StreamFlags { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) { - let _d = write!(f, "MUST_SCAN_SUBDIRS "); - } - if self.contains(StreamFlags::USER_DROPPED) { - let _d = write!(f, "USER_DROPPED "); - } - if self.contains(StreamFlags::KERNEL_DROPPED) { - let _d = write!(f, "KERNEL_DROPPED "); - } - if self.contains(StreamFlags::IDS_WRAPPED) { - let _d = write!(f, "IDS_WRAPPED "); - } - if self.contains(StreamFlags::HISTORY_DONE) { - let _d = write!(f, "HISTORY_DONE "); - } - if self.contains(StreamFlags::ROOT_CHANGED) { - let _d = write!(f, "ROOT_CHANGED "); - } - if self.contains(StreamFlags::MOUNT) { - let _d = write!(f, "MOUNT "); - } - if self.contains(StreamFlags::UNMOUNT) { - let _d = write!(f, "UNMOUNT "); - } - if self.contains(StreamFlags::ITEM_CREATED) { - let _d = write!(f, "ITEM_CREATED "); - } - if self.contains(StreamFlags::ITEM_REMOVED) { - let _d = write!(f, "ITEM_REMOVED "); - } - if self.contains(StreamFlags::INODE_META_MOD) { - let _d = write!(f, "INODE_META_MOD "); - } - if self.contains(StreamFlags::ITEM_RENAMED) { - let _d = write!(f, "ITEM_RENAMED "); - } - if self.contains(StreamFlags::ITEM_MODIFIED) { - let _d = write!(f, "ITEM_MODIFIED "); - } - if self.contains(StreamFlags::FINDER_INFO_MOD) { - let _d = write!(f, "FINDER_INFO_MOD "); - } - if self.contains(StreamFlags::ITEM_CHANGE_OWNER) { - let _d = write!(f, "ITEM_CHANGE_OWNER "); - } - if self.contains(StreamFlags::ITEM_XATTR_MOD) { - let _d = write!(f, "ITEM_XATTR_MOD "); - } - if self.contains(StreamFlags::IS_FILE) { - let _d = write!(f, "IS_FILE "); - } - if self.contains(StreamFlags::IS_DIR) { - let _d = write!(f, "IS_DIR "); - } - if self.contains(StreamFlags::IS_SYMLINK) { - let _d = write!(f, "IS_SYMLINK "); - } - if self.contains(StreamFlags::OWN_EVENT) { - let _d = write!(f, "OWN_EVENT "); - } - if self.contains(StreamFlags::IS_LAST_HARDLINK) { - let _d = write!(f, "IS_LAST_HARDLINK "); - } - if self.contains(StreamFlags::IS_HARDLINK) { - let _d = write!(f, "IS_HARDLINK "); - } - if self.contains(StreamFlags::ITEM_CLONED) { - let _d = write!(f, "ITEM_CLONED "); - } - write!(f, "") - } -} - -#[link(name = "CoreServices", kind = "framework")] -unsafe extern "C" { - pub fn FSEventsGetCurrentEventId() -> u64; -} - -// These tests are disabled by default because they seem to be unresolvably flaky. -// Feel free to bring them back to help test this code -#[cfg(false)] -mod tests { - use super::*; - use std::{fs, sync::mpsc, thread, time::Duration}; - - #[test] - fn test_event_stream_simple() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(timeout()).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let mut events = rx.recv_timeout(timeout()).unwrap(); - let mut event = events.last().unwrap(); - // we see this duplicate about 1/100 test runs. - if event.path == path.join("new-file") - && event.flags.contains(StreamFlags::ITEM_CREATED) - { - events = rx.recv_timeout(timeout()).unwrap(); - event = events.last().unwrap(); - } - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_delayed_start() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - - // Delay the call to `run` in order to make sure we don't miss any events that occur - // between creating the `EventStream` and calling `run`. - thread::spawn(move || { - thread::sleep(Duration::from_millis(100)); - stream.run(move |events| tx.send(events.to_vec()).is_ok()) - }); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(timeout()).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let events = rx.recv_timeout(timeout()).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_shutdown_by_dropping_handle() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || { - stream.run({ - let tx = tx.clone(); - move |_| { - tx.send("running").unwrap(); - true - } - }); - tx.send("stopped").unwrap(); - }); - - fs::write(path.join("new-file"), "").unwrap(); - assert_eq!(rx.recv_timeout(timeout()).unwrap(), "running"); - - // Dropping the handle causes `EventStream::run` to return. - drop(handle); - assert_eq!(rx.recv_timeout(timeout()).unwrap(), "stopped"); - } - - #[test] - fn test_event_stream_shutdown_before_run() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - drop(handle); - - // This returns immediately because the handle was already dropped. - stream.run(|_| true); - } - - fn flush_historical_events() { - thread::sleep(timeout()); - } - - fn timeout() -> Duration { - if std::env::var("CI").is_ok() { - Duration::from_secs(4) - } else { - Duration::from_millis(500) - } - } -}