Detailed changes
@@ -6518,7 +6518,6 @@ dependencies = [
"cocoa 0.26.0",
"collections",
"fs",
- "fsevent",
"futures 0.3.31",
"git",
"gpui",
@@ -6577,27 +6576,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
-[[package]]
-name = "fsevent"
-version = "0.1.0"
-dependencies = [
- "bitflags 2.9.4",
- "core-foundation 0.10.0",
- "fsevent-sys 3.1.0",
- "log",
- "parking_lot",
- "tempfile",
-]
-
-[[package]]
-name = "fsevent-sys"
-version = "3.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca6f5e6817058771c10f0eb0f05ddf1e35844266f972004fe8e4b21fda295bd5"
-dependencies = [
- "libc",
-]
-
[[package]]
name = "fsevent-sys"
version = "4.1.0"
@@ -10592,7 +10570,7 @@ dependencies = [
"bitflags 2.9.4",
"crossbeam-channel",
"filetime",
- "fsevent-sys 4.1.0",
+ "fsevent-sys",
"inotify 0.9.6",
"kqueue",
"libc",
@@ -10608,7 +10586,7 @@ version = "8.2.0"
source = "git+https://github.com/zed-industries/notify.git?rev=6c550ac3c56cbd143c57ea6390e197af9d790908#6c550ac3c56cbd143c57ea6390e197af9d790908"
dependencies = [
"bitflags 2.9.4",
- "fsevent-sys 4.1.0",
+ "fsevent-sys",
"inotify 0.11.0",
"kqueue",
"libc",
@@ -76,7 +76,6 @@ members = [
"crates/file_icons",
"crates/fs",
"crates/fs_benchmarks",
- "crates/fsevent",
"crates/fuzzy",
"crates/git",
"crates/git_graph",
@@ -313,7 +312,6 @@ feedback = { path = "crates/feedback" }
file_finder = { path = "crates/file_finder" }
file_icons = { path = "crates/file_icons" }
fs = { path = "crates/fs" }
-fsevent = { path = "crates/fsevent" }
fuzzy = { path = "crates/fuzzy" }
git = { path = "crates/git" }
git_graph = { path = "crates/git_graph" }
@@ -834,7 +832,6 @@ command_palette = { codegen-units = 1 }
command_palette_hooks = { codegen-units = 1 }
feature_flags = { codegen-units = 1 }
file_icons = { codegen-units = 1 }
-fsevent = { codegen-units = 1 }
image_viewer = { codegen-units = 1 }
edit_prediction_ui = { codegen-units = 1 }
install_cli = { codegen-units = 1 }
@@ -40,15 +40,12 @@ text.workspace = true
time.workspace = true
util.workspace = true
is_executable = "1.0.5"
+notify = "8.2.0"
[target.'cfg(target_os = "macos")'.dependencies]
-fsevent.workspace = true
objc.workspace = true
cocoa = "0.26"
-[target.'cfg(not(target_os = "macos"))'.dependencies]
-notify = "8.2.0"
-
[target.'cfg(target_os = "windows")'.dependencies]
windows.workspace = true
@@ -1,7 +1,3 @@
-#[cfg(target_os = "macos")]
-mod mac_watcher;
-
-#[cfg(not(target_os = "macos"))]
pub mod fs_watcher;
use parking_lot::Mutex;
@@ -976,62 +972,6 @@ impl Fs for RealFs {
Ok(Box::pin(result))
}
- #[cfg(target_os = "macos")]
- async fn watch(
- &self,
- path: &Path,
- latency: Duration,
- ) -> (
- Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
- Arc<dyn Watcher>,
- ) {
- use fsevent::StreamFlags;
-
- let (events_tx, events_rx) = smol::channel::unbounded();
- let handles = Arc::new(parking_lot::Mutex::new(collections::BTreeMap::default()));
- let watcher = Arc::new(mac_watcher::MacWatcher::new(
- events_tx,
- Arc::downgrade(&handles),
- latency,
- ));
- watcher.add(path).expect("handles can't be dropped");
-
- (
- Box::pin(
- events_rx
- .map(|events| {
- events
- .into_iter()
- .map(|event| {
- log::trace!("fs path event: {event:?}");
- let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) {
- Some(PathEventKind::Removed)
- } else if event.flags.contains(StreamFlags::ITEM_CREATED) {
- Some(PathEventKind::Created)
- } else if event.flags.contains(StreamFlags::ITEM_MODIFIED)
- | event.flags.contains(StreamFlags::ITEM_RENAMED)
- {
- Some(PathEventKind::Changed)
- } else {
- None
- };
- PathEvent {
- path: event.path,
- kind,
- }
- })
- .collect()
- })
- .chain(futures::stream::once(async move {
- drop(handles);
- vec![]
- })),
- ),
- watcher,
- )
- }
-
- #[cfg(not(target_os = "macos"))]
async fn watch(
&self,
path: &Path,
@@ -50,7 +50,7 @@ impl Watcher for FsWatcher {
let tx = self.tx.clone();
let pending_paths = self.pending_path_events.clone();
- #[cfg(target_os = "windows")]
+ #[cfg(any(target_os = "windows", target_os = "macos"))]
{
// Return early if an ancestor of this path was already being watched.
// saves a huge amount of memory
@@ -81,7 +81,7 @@ impl Watcher for FsWatcher {
let root_path = SanitizedPath::new_arc(path);
let path: Arc<std::path::Path> = path.into();
- #[cfg(target_os = "windows")]
+ #[cfg(any(target_os = "windows", target_os = "macos"))]
let mode = notify::RecursiveMode::Recursive;
#[cfg(target_os = "linux")]
let mode = notify::RecursiveMode::NonRecursive;
@@ -166,6 +166,8 @@ pub struct GlobalWatcher {
watcher: Mutex<notify::KqueueWatcher>,
#[cfg(target_os = "windows")]
watcher: Mutex<notify::ReadDirectoryChangesWatcher>,
+ #[cfg(target_os = "macos")]
+ watcher: Mutex<notify::FsEventWatcher>,
}
impl GlobalWatcher {
@@ -178,10 +180,25 @@ impl GlobalWatcher {
) -> anyhow::Result<WatcherRegistrationId> {
use notify::Watcher;
- self.watcher.lock().watch(&path, mode)?;
-
let mut state = self.state.lock();
+ // Check if this path is already covered by an existing watched ancestor path.
+ // On macOS and Windows, watching is recursive, so we don't need to watch
+ // child paths if an ancestor is already being watched.
+ #[cfg(any(target_os = "windows", target_os = "macos"))]
+ let path_already_covered = state.path_registrations.keys().any(|existing| {
+ path.starts_with(existing.as_ref()) && path.as_ref() != existing.as_ref()
+ });
+
+ #[cfg(not(any(target_os = "windows", target_os = "macos")))]
+ let path_already_covered = false;
+
+ if !path_already_covered && !state.path_registrations.contains_key(&path) {
+ drop(state);
+ self.watcher.lock().watch(&path, mode)?;
+ state = self.state.lock();
+ }
+
let id = state.last_registration;
state.last_registration = WatcherRegistrationId(id.0 + 1);
@@ -1,77 +0,0 @@
-use crate::Watcher;
-use anyhow::{Context as _, Result};
-use collections::{BTreeMap, Bound};
-use fsevent::EventStream;
-use parking_lot::Mutex;
-use std::{
- path::{Path, PathBuf},
- sync::Weak,
- thread,
- time::Duration,
-};
-
-pub struct MacWatcher {
- events_tx: smol::channel::Sender<Vec<fsevent::Event>>,
- handles: Weak<Mutex<BTreeMap<PathBuf, fsevent::Handle>>>,
- latency: Duration,
-}
-
-impl MacWatcher {
- pub fn new(
- events_tx: smol::channel::Sender<Vec<fsevent::Event>>,
- handles: Weak<Mutex<BTreeMap<PathBuf, fsevent::Handle>>>,
- latency: Duration,
- ) -> Self {
- Self {
- events_tx,
- handles,
- latency,
- }
- }
-}
-
-impl Watcher for MacWatcher {
- fn add(&self, path: &Path) -> Result<()> {
- log::trace!("mac watcher add: {:?}", path);
- let handles = self
- .handles
- .upgrade()
- .context("unable to watch path, receiver dropped")?;
- let mut handles = handles.lock();
-
- // Return early if an ancestor of this path was already being watched.
- if let Some((watched_path, _)) = handles
- .range::<Path, _>((Bound::Unbounded, Bound::Included(path)))
- .next_back()
- && path.starts_with(watched_path)
- {
- log::trace!(
- "mac watched path starts with existing watched path: {watched_path:?}, {path:?}"
- );
- return Ok(());
- }
-
- let (stream, handle) = EventStream::new(&[path], self.latency);
- let tx = self.events_tx.clone();
- thread::Builder::new()
- .name("MacWatcher".to_owned())
- .spawn(move || {
- stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
- })
- .unwrap();
- handles.insert(path.into(), handle);
-
- Ok(())
- }
-
- fn remove(&self, path: &Path) -> anyhow::Result<()> {
- let handles = self
- .handles
- .upgrade()
- .context("unable to remove path, receiver dropped")?;
-
- let mut handles = handles.lock();
- handles.remove(path);
- Ok(())
- }
-}
@@ -1,28 +0,0 @@
-[package]
-name = "fsevent"
-version = "0.1.0"
-edition.workspace = true
-publish.workspace = true
-license = "GPL-3.0-or-later"
-
-[lints]
-workspace = true
-
-[lib]
-path = "src/fsevent.rs"
-doctest = false
-
-[dependencies]
-bitflags.workspace = true
-parking_lot.workspace = true
-log.workspace = true
-
-[target.'cfg(target_os = "macos")'.dependencies]
-core-foundation.workspace = true
-fsevent-sys = "3.0.2"
-
-[dev-dependencies]
-tempfile.workspace = true
-
-[package.metadata.docs.rs]
-targets = ["x86_64-apple-darwin"]
@@ -1 +0,0 @@
-../../LICENSE-GPL
@@ -1,23 +0,0 @@
-#[cfg(target_os = "macos")]
-fn main() {
- use fsevent::EventStream;
- use std::{env::args, path::Path, time::Duration};
-
- let paths = args().skip(1).collect::<Vec<_>>();
- let paths = paths.iter().map(Path::new).collect::<Vec<_>>();
- assert!(!paths.is_empty(), "Must pass 1 or more paths as arguments");
-
- let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100));
- stream.run(|events| {
- eprintln!("event batch");
- for event in events {
- eprintln!(" {:?}", event);
- }
- true
- });
-}
-
-#[cfg(not(target_os = "macos"))]
-fn main() {
- eprintln!("This example only works on macOS");
-}
@@ -1,515 +0,0 @@
-#![cfg(target_os = "macos")]
-
-use bitflags::bitflags;
-use fsevent_sys::{self as fs, core_foundation as cf};
-use parking_lot::Mutex;
-use std::{
- convert::AsRef,
- ffi::{CStr, OsStr, c_void},
- os::unix::ffi::OsStrExt,
- path::{Path, PathBuf},
- ptr, slice,
- sync::Arc,
- time::Duration,
-};
-
-#[derive(Clone, Debug)]
-pub struct Event {
- pub event_id: u64,
- pub flags: StreamFlags,
- pub path: PathBuf,
-}
-
-pub struct EventStream {
- lifecycle: Arc<Mutex<Lifecycle>>,
- state: Box<State>,
-}
-
-struct State {
- latency: Duration,
- paths: cf::CFMutableArrayRef,
- callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
- last_valid_event_id: Option<fs::FSEventStreamEventId>,
- stream: fs::FSEventStreamRef,
-}
-
-impl Drop for State {
- fn drop(&mut self) {
- unsafe {
- cf::CFRelease(self.paths);
- fs::FSEventStreamStop(self.stream);
- fs::FSEventStreamInvalidate(self.stream);
- fs::FSEventStreamRelease(self.stream);
- }
- }
-}
-
-enum Lifecycle {
- New,
- Running(cf::CFRunLoopRef),
- Stopped,
-}
-
-pub struct Handle(Arc<Mutex<Lifecycle>>);
-
-unsafe impl Send for EventStream {}
-unsafe impl Send for Lifecycle {}
-
-impl EventStream {
- pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
- unsafe {
- let cf_paths =
- cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
- assert!(!cf_paths.is_null());
-
- for path in paths {
- let path_bytes = path.as_os_str().as_bytes();
- let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
- cf::kCFAllocatorDefault,
- path_bytes.as_ptr() as *const i8,
- path_bytes.len() as cf::CFIndex,
- false,
- );
- if !cf_url.is_null() {
- let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
- cf::CFArrayAppendValue(cf_paths, cf_path);
- cf::CFRelease(cf_path);
- cf::CFRelease(cf_url);
- } else {
- log::error!("Failed to create CFURL for path: {path:?}");
- }
- }
-
- let mut state = Box::new(State {
- latency,
- paths: cf_paths,
- callback: None,
- last_valid_event_id: None,
- stream: ptr::null_mut(),
- });
- let stream_context = fs::FSEventStreamContext {
- version: 0,
- info: state.as_ref() as *const _ as *mut c_void,
- retain: None,
- release: None,
- copy_description: None,
- };
- let stream = fs::FSEventStreamCreate(
- cf::kCFAllocatorDefault,
- Self::trampoline,
- &stream_context,
- cf_paths,
- FSEventsGetCurrentEventId(),
- latency.as_secs_f64(),
- fs::kFSEventStreamCreateFlagFileEvents
- | fs::kFSEventStreamCreateFlagNoDefer
- | fs::kFSEventStreamCreateFlagWatchRoot,
- );
- state.stream = stream;
-
- let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
- (
- EventStream {
- lifecycle: lifecycle.clone(),
- state,
- },
- Handle(lifecycle),
- )
- }
- }
-
- pub fn run<F>(mut self, f: F)
- where
- F: FnMut(Vec<Event>) -> bool + 'static,
- {
- self.state.callback = Some(Box::new(f));
- unsafe {
- let run_loop =
- core_foundation::base::CFRetain(cf::CFRunLoopGetCurrent()) as *mut c_void;
- {
- let mut state = self.lifecycle.lock();
- match *state {
- Lifecycle::New => *state = Lifecycle::Running(run_loop),
- Lifecycle::Running(_) => unreachable!(),
- Lifecycle::Stopped => return,
- }
- }
- fs::FSEventStreamScheduleWithRunLoop(
- self.state.stream,
- run_loop,
- cf::kCFRunLoopDefaultMode,
- );
- fs::FSEventStreamStart(self.state.stream);
- cf::CFRunLoopRun();
- }
- }
-
- extern "C" fn trampoline(
- stream_ref: fs::FSEventStreamRef,
- info: *mut ::std::os::raw::c_void,
- num: usize, // size_t numEvents
- event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
- event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
- event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
- ) {
- unsafe {
- let event_paths = event_paths as *const *const ::std::os::raw::c_char;
- let e_ptr = event_flags as *mut u32;
- let i_ptr = event_ids as *mut u64;
- let state = (info as *mut State).as_mut().unwrap();
- let callback = if let Some(callback) = state.callback.as_mut() {
- callback
- } else {
- return;
- };
-
- let paths = slice::from_raw_parts(event_paths, num);
- let flags = slice::from_raw_parts_mut(e_ptr, num);
- let ids = slice::from_raw_parts_mut(i_ptr, num);
- let mut stream_restarted = false;
-
- // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
- // or our code couldn't keep up with the sheer volume of file-system events that were
- // generated. If we observed a valid event before this happens, we'll try to read the
- // file-system journal by stopping the current stream and creating a new one starting at
- // such event. Otherwise, we'll let invoke the callback with the dropped event, which
- // will likely perform a re-scan of one of the root directories.
- if flags
- .iter()
- .copied()
- .filter_map(StreamFlags::from_bits)
- .any(|flags| {
- flags.contains(StreamFlags::USER_DROPPED)
- || flags.contains(StreamFlags::KERNEL_DROPPED)
- })
- && let Some(last_valid_event_id) = state.last_valid_event_id.take()
- {
- fs::FSEventStreamStop(state.stream);
- fs::FSEventStreamInvalidate(state.stream);
- fs::FSEventStreamRelease(state.stream);
-
- let stream_context = fs::FSEventStreamContext {
- version: 0,
- info,
- retain: None,
- release: None,
- copy_description: None,
- };
- let stream = fs::FSEventStreamCreate(
- cf::kCFAllocatorDefault,
- Self::trampoline,
- &stream_context,
- state.paths,
- last_valid_event_id,
- state.latency.as_secs_f64(),
- fs::kFSEventStreamCreateFlagFileEvents
- | fs::kFSEventStreamCreateFlagNoDefer
- | fs::kFSEventStreamCreateFlagWatchRoot,
- );
-
- state.stream = stream;
- fs::FSEventStreamScheduleWithRunLoop(
- state.stream,
- cf::CFRunLoopGetCurrent(),
- cf::kCFRunLoopDefaultMode,
- );
- fs::FSEventStreamStart(state.stream);
- stream_restarted = true;
- }
-
- if !stream_restarted {
- let mut events = Vec::with_capacity(num);
- for p in 0..num {
- if let Some(flag) = StreamFlags::from_bits(flags[p]) {
- if !flag.contains(StreamFlags::HISTORY_DONE) {
- let path_c_str = CStr::from_ptr(paths[p]);
- let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
- let event = Event {
- event_id: ids[p],
- flags: flag,
- path,
- };
- state.last_valid_event_id = Some(event.event_id);
- events.push(event);
- }
- } else {
- debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
- }
- }
-
- if !events.is_empty() && !callback(events) {
- fs::FSEventStreamStop(stream_ref);
- cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
- }
- }
- }
- }
-}
-
-impl Drop for Handle {
- fn drop(&mut self) {
- let mut state = self.0.lock();
- if let Lifecycle::Running(run_loop) = *state {
- unsafe {
- cf::CFRunLoopStop(run_loop);
- cf::CFRelease(run_loop)
- }
- }
- *state = Lifecycle::Stopped;
- }
-}
-
-// Synchronize with
-// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
-bitflags! {
- #[derive(Debug, PartialEq, Eq, Clone, Copy)]
- #[repr(C)]
- pub struct StreamFlags: u32 {
- const NONE = 0x00000000;
- const MUST_SCAN_SUBDIRS = 0x00000001;
- const USER_DROPPED = 0x00000002;
- const KERNEL_DROPPED = 0x00000004;
- const IDS_WRAPPED = 0x00000008;
- const HISTORY_DONE = 0x00000010;
- const ROOT_CHANGED = 0x00000020;
- const MOUNT = 0x00000040;
- const UNMOUNT = 0x00000080;
- const ITEM_CREATED = 0x00000100;
- const ITEM_REMOVED = 0x00000200;
- const INODE_META_MOD = 0x00000400;
- const ITEM_RENAMED = 0x00000800;
- const ITEM_MODIFIED = 0x00001000;
- const FINDER_INFO_MOD = 0x00002000;
- const ITEM_CHANGE_OWNER = 0x00004000;
- const ITEM_XATTR_MOD = 0x00008000;
- const IS_FILE = 0x00010000;
- const IS_DIR = 0x00020000;
- const IS_SYMLINK = 0x00040000;
- const OWN_EVENT = 0x00080000;
- const IS_HARDLINK = 0x00100000;
- const IS_LAST_HARDLINK = 0x00200000;
- const ITEM_CLONED = 0x400000;
- }
-}
-
-impl std::fmt::Display for StreamFlags {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
- let _d = write!(f, "MUST_SCAN_SUBDIRS ");
- }
- if self.contains(StreamFlags::USER_DROPPED) {
- let _d = write!(f, "USER_DROPPED ");
- }
- if self.contains(StreamFlags::KERNEL_DROPPED) {
- let _d = write!(f, "KERNEL_DROPPED ");
- }
- if self.contains(StreamFlags::IDS_WRAPPED) {
- let _d = write!(f, "IDS_WRAPPED ");
- }
- if self.contains(StreamFlags::HISTORY_DONE) {
- let _d = write!(f, "HISTORY_DONE ");
- }
- if self.contains(StreamFlags::ROOT_CHANGED) {
- let _d = write!(f, "ROOT_CHANGED ");
- }
- if self.contains(StreamFlags::MOUNT) {
- let _d = write!(f, "MOUNT ");
- }
- if self.contains(StreamFlags::UNMOUNT) {
- let _d = write!(f, "UNMOUNT ");
- }
- if self.contains(StreamFlags::ITEM_CREATED) {
- let _d = write!(f, "ITEM_CREATED ");
- }
- if self.contains(StreamFlags::ITEM_REMOVED) {
- let _d = write!(f, "ITEM_REMOVED ");
- }
- if self.contains(StreamFlags::INODE_META_MOD) {
- let _d = write!(f, "INODE_META_MOD ");
- }
- if self.contains(StreamFlags::ITEM_RENAMED) {
- let _d = write!(f, "ITEM_RENAMED ");
- }
- if self.contains(StreamFlags::ITEM_MODIFIED) {
- let _d = write!(f, "ITEM_MODIFIED ");
- }
- if self.contains(StreamFlags::FINDER_INFO_MOD) {
- let _d = write!(f, "FINDER_INFO_MOD ");
- }
- if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
- let _d = write!(f, "ITEM_CHANGE_OWNER ");
- }
- if self.contains(StreamFlags::ITEM_XATTR_MOD) {
- let _d = write!(f, "ITEM_XATTR_MOD ");
- }
- if self.contains(StreamFlags::IS_FILE) {
- let _d = write!(f, "IS_FILE ");
- }
- if self.contains(StreamFlags::IS_DIR) {
- let _d = write!(f, "IS_DIR ");
- }
- if self.contains(StreamFlags::IS_SYMLINK) {
- let _d = write!(f, "IS_SYMLINK ");
- }
- if self.contains(StreamFlags::OWN_EVENT) {
- let _d = write!(f, "OWN_EVENT ");
- }
- if self.contains(StreamFlags::IS_LAST_HARDLINK) {
- let _d = write!(f, "IS_LAST_HARDLINK ");
- }
- if self.contains(StreamFlags::IS_HARDLINK) {
- let _d = write!(f, "IS_HARDLINK ");
- }
- if self.contains(StreamFlags::ITEM_CLONED) {
- let _d = write!(f, "ITEM_CLONED ");
- }
- write!(f, "")
- }
-}
-
-#[link(name = "CoreServices", kind = "framework")]
-unsafe extern "C" {
- pub fn FSEventsGetCurrentEventId() -> u64;
-}
-
-// These tests are disabled by default because they seem to be unresolvably flaky.
-// Feel free to bring them back to help test this code
-#[cfg(false)]
-mod tests {
- use super::*;
- use std::{fs, sync::mpsc, thread, time::Duration};
-
- #[test]
- fn test_event_stream_simple() {
- for _ in 0..3 {
- let dir = tempfile::Builder::new()
- .prefix("test-event-stream")
- .tempdir()
- .unwrap();
- let path = dir.path().canonicalize().unwrap();
- for i in 0..10 {
- fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
- }
- flush_historical_events();
-
- let (tx, rx) = mpsc::channel();
- let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
- thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
-
- fs::write(path.join("new-file"), "").unwrap();
- let events = rx.recv_timeout(timeout()).unwrap();
- let event = events.last().unwrap();
- assert_eq!(event.path, path.join("new-file"));
- assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
-
- fs::remove_file(path.join("existing-file-5")).unwrap();
- let mut events = rx.recv_timeout(timeout()).unwrap();
- let mut event = events.last().unwrap();
- // we see this duplicate about 1/100 test runs.
- if event.path == path.join("new-file")
- && event.flags.contains(StreamFlags::ITEM_CREATED)
- {
- events = rx.recv_timeout(timeout()).unwrap();
- event = events.last().unwrap();
- }
- assert_eq!(event.path, path.join("existing-file-5"));
- assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
- drop(handle);
- }
- }
-
- #[test]
- fn test_event_stream_delayed_start() {
- for _ in 0..3 {
- let dir = tempfile::Builder::new()
- .prefix("test-event-stream")
- .tempdir()
- .unwrap();
- let path = dir.path().canonicalize().unwrap();
- for i in 0..10 {
- fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
- }
- flush_historical_events();
-
- let (tx, rx) = mpsc::channel();
- let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
-
- // Delay the call to `run` in order to make sure we don't miss any events that occur
- // between creating the `EventStream` and calling `run`.
- thread::spawn(move || {
- thread::sleep(Duration::from_millis(100));
- stream.run(move |events| tx.send(events.to_vec()).is_ok())
- });
-
- fs::write(path.join("new-file"), "").unwrap();
- let events = rx.recv_timeout(timeout()).unwrap();
- let event = events.last().unwrap();
- assert_eq!(event.path, path.join("new-file"));
- assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
-
- fs::remove_file(path.join("existing-file-5")).unwrap();
- let events = rx.recv_timeout(timeout()).unwrap();
- let event = events.last().unwrap();
- assert_eq!(event.path, path.join("existing-file-5"));
- assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
- drop(handle);
- }
- }
-
- #[test]
- fn test_event_stream_shutdown_by_dropping_handle() {
- let dir = tempfile::Builder::new()
- .prefix("test-event-stream")
- .tempdir()
- .unwrap();
- let path = dir.path().canonicalize().unwrap();
- flush_historical_events();
-
- let (tx, rx) = mpsc::channel();
- let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
- thread::spawn(move || {
- stream.run({
- let tx = tx.clone();
- move |_| {
- tx.send("running").unwrap();
- true
- }
- });
- tx.send("stopped").unwrap();
- });
-
- fs::write(path.join("new-file"), "").unwrap();
- assert_eq!(rx.recv_timeout(timeout()).unwrap(), "running");
-
- // Dropping the handle causes `EventStream::run` to return.
- drop(handle);
- assert_eq!(rx.recv_timeout(timeout()).unwrap(), "stopped");
- }
-
- #[test]
- fn test_event_stream_shutdown_before_run() {
- let dir = tempfile::Builder::new()
- .prefix("test-event-stream")
- .tempdir()
- .unwrap();
- let path = dir.path().canonicalize().unwrap();
-
- let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
- drop(handle);
-
- // This returns immediately because the handle was already dropped.
- stream.run(|_| true);
- }
-
- fn flush_historical_events() {
- thread::sleep(timeout());
- }
-
- fn timeout() -> Duration {
- if std::env::var("CI").is_ok() {
- Duration::from_secs(4)
- } else {
- Duration::from_millis(500)
- }
- }
-}