1#![cfg(target_os = "macos")]
2
3use bitflags::bitflags;
4use fsevent_sys::{self as fs, core_foundation as cf};
5use std::{
6 convert::AsRef,
7 ffi::{c_void, CStr, OsStr},
8 os::unix::ffi::OsStrExt,
9 path::{Path, PathBuf},
10 slice,
11 sync::mpsc::Sender,
12 time::Duration,
13};
14
15#[derive(Debug)]
16pub struct Event {
17 pub event_id: u64,
18 pub flags: StreamFlags,
19 pub path: PathBuf,
20}
21
22pub struct EventStream {
23 stream: fs::FSEventStreamRef,
24 _sender: Box<Sender<Vec<Event>>>,
25}
26
27unsafe impl Send for EventStream {}
28
29impl EventStream {
30 pub fn new(paths: &[&Path], latency: Duration, event_sender: Sender<Vec<Event>>) -> Self {
31 unsafe {
32 let sender = Box::new(event_sender);
33 let stream_context = fs::FSEventStreamContext {
34 version: 0,
35 info: sender.as_ref() as *const _ as *mut c_void,
36 retain: None,
37 release: None,
38 copy_description: None,
39 };
40
41 let cf_paths =
42 cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
43 assert!(!cf_paths.is_null());
44
45 for path in paths {
46 let path_bytes = path.as_os_str().as_bytes();
47 let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
48 cf::kCFAllocatorDefault,
49 path_bytes.as_ptr() as *const i8,
50 path_bytes.len() as cf::CFIndex,
51 false,
52 );
53 let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
54 cf::CFArrayAppendValue(cf_paths, cf_path);
55 cf::CFRelease(cf_path);
56 cf::CFRelease(cf_url);
57 }
58
59 let stream = fs::FSEventStreamCreate(
60 cf::kCFAllocatorDefault,
61 callback,
62 &stream_context,
63 cf_paths,
64 fs::kFSEventStreamEventIdSinceNow,
65 latency.as_secs_f64(),
66 fs::kFSEventStreamCreateFlagFileEvents
67 | fs::kFSEventStreamCreateFlagNoDefer
68 | fs::kFSEventStreamCreateFlagWatchRoot,
69 );
70 cf::CFRelease(cf_paths);
71
72 EventStream {
73 stream,
74 _sender: sender,
75 }
76 }
77 }
78
79 pub fn run(self) {
80 unsafe {
81 fs::FSEventStreamScheduleWithRunLoop(
82 self.stream,
83 cf::CFRunLoopGetCurrent(),
84 cf::kCFRunLoopDefaultMode,
85 );
86
87 fs::FSEventStreamStart(self.stream);
88 cf::CFRunLoopRun();
89
90 fs::FSEventStreamFlushSync(self.stream);
91 fs::FSEventStreamStop(self.stream);
92 fs::FSEventStreamRelease(self.stream);
93 }
94 }
95}
96
97extern "C" fn callback(
98 stream_ref: fs::FSEventStreamRef,
99 info: *mut ::std::os::raw::c_void,
100 num: usize, // size_t numEvents
101 event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
102 event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
103 event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
104) {
105 unsafe {
106 let event_paths = event_paths as *const *const ::std::os::raw::c_char;
107 let e_ptr = event_flags as *mut u32;
108 let i_ptr = event_ids as *mut u64;
109 let sender = (info as *mut Sender<Vec<Event>>).as_mut().unwrap();
110
111 let paths = slice::from_raw_parts(event_paths, num);
112 let flags = slice::from_raw_parts_mut(e_ptr, num);
113 let ids = slice::from_raw_parts_mut(i_ptr, num);
114
115 let mut events = Vec::with_capacity(num);
116 for p in 0..num {
117 let path_c_str = CStr::from_ptr(paths[p]);
118 let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
119 if let Some(flag) = StreamFlags::from_bits(flags[p]) {
120 events.push(Event {
121 event_id: ids[p],
122 flags: flag,
123 path,
124 });
125 } else {
126 debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
127 }
128 }
129
130 if sender.send(events).is_err() {
131 fs::FSEventStreamStop(stream_ref);
132 cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
133 }
134 }
135}
136
137// Synchronize with
138// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
139bitflags! {
140 #[repr(C)]
141 pub struct StreamFlags: u32 {
142 const NONE = 0x00000000;
143 const MUST_SCAN_SUBDIRS = 0x00000001;
144 const USER_DROPPED = 0x00000002;
145 const KERNEL_DROPPED = 0x00000004;
146 const IDS_WRAPPED = 0x00000008;
147 const HISTORY_DONE = 0x00000010;
148 const ROOT_CHANGED = 0x00000020;
149 const MOUNT = 0x00000040;
150 const UNMOUNT = 0x00000080;
151 const ITEM_CREATED = 0x00000100;
152 const ITEM_REMOVED = 0x00000200;
153 const INODE_META_MOD = 0x00000400;
154 const ITEM_RENAMED = 0x00000800;
155 const ITEM_MODIFIED = 0x00001000;
156 const FINDER_INFO_MOD = 0x00002000;
157 const ITEM_CHANGE_OWNER = 0x00004000;
158 const ITEM_XATTR_MOD = 0x00008000;
159 const IS_FILE = 0x00010000;
160 const IS_DIR = 0x00020000;
161 const IS_SYMLINK = 0x00040000;
162 const OWN_EVENT = 0x00080000;
163 const IS_HARDLINK = 0x00100000;
164 const IS_LAST_HARDLINK = 0x00200000;
165 const ITEM_CLONED = 0x400000;
166 }
167}
168
169impl std::fmt::Display for StreamFlags {
170 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
171 if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) {
172 let _d = write!(f, "MUST_SCAN_SUBDIRS ");
173 }
174 if self.contains(StreamFlags::USER_DROPPED) {
175 let _d = write!(f, "USER_DROPPED ");
176 }
177 if self.contains(StreamFlags::KERNEL_DROPPED) {
178 let _d = write!(f, "KERNEL_DROPPED ");
179 }
180 if self.contains(StreamFlags::IDS_WRAPPED) {
181 let _d = write!(f, "IDS_WRAPPED ");
182 }
183 if self.contains(StreamFlags::HISTORY_DONE) {
184 let _d = write!(f, "HISTORY_DONE ");
185 }
186 if self.contains(StreamFlags::ROOT_CHANGED) {
187 let _d = write!(f, "ROOT_CHANGED ");
188 }
189 if self.contains(StreamFlags::MOUNT) {
190 let _d = write!(f, "MOUNT ");
191 }
192 if self.contains(StreamFlags::UNMOUNT) {
193 let _d = write!(f, "UNMOUNT ");
194 }
195 if self.contains(StreamFlags::ITEM_CREATED) {
196 let _d = write!(f, "ITEM_CREATED ");
197 }
198 if self.contains(StreamFlags::ITEM_REMOVED) {
199 let _d = write!(f, "ITEM_REMOVED ");
200 }
201 if self.contains(StreamFlags::INODE_META_MOD) {
202 let _d = write!(f, "INODE_META_MOD ");
203 }
204 if self.contains(StreamFlags::ITEM_RENAMED) {
205 let _d = write!(f, "ITEM_RENAMED ");
206 }
207 if self.contains(StreamFlags::ITEM_MODIFIED) {
208 let _d = write!(f, "ITEM_MODIFIED ");
209 }
210 if self.contains(StreamFlags::FINDER_INFO_MOD) {
211 let _d = write!(f, "FINDER_INFO_MOD ");
212 }
213 if self.contains(StreamFlags::ITEM_CHANGE_OWNER) {
214 let _d = write!(f, "ITEM_CHANGE_OWNER ");
215 }
216 if self.contains(StreamFlags::ITEM_XATTR_MOD) {
217 let _d = write!(f, "ITEM_XATTR_MOD ");
218 }
219 if self.contains(StreamFlags::IS_FILE) {
220 let _d = write!(f, "IS_FILE ");
221 }
222 if self.contains(StreamFlags::IS_DIR) {
223 let _d = write!(f, "IS_DIR ");
224 }
225 if self.contains(StreamFlags::IS_SYMLINK) {
226 let _d = write!(f, "IS_SYMLINK ");
227 }
228 if self.contains(StreamFlags::OWN_EVENT) {
229 let _d = write!(f, "OWN_EVENT ");
230 }
231 if self.contains(StreamFlags::IS_LAST_HARDLINK) {
232 let _d = write!(f, "IS_LAST_HARDLINK ");
233 }
234 if self.contains(StreamFlags::IS_HARDLINK) {
235 let _d = write!(f, "IS_HARDLINK ");
236 }
237 if self.contains(StreamFlags::ITEM_CLONED) {
238 let _d = write!(f, "ITEM_CLONED ");
239 }
240 write!(f, "")
241 }
242}
243
244#[test]
245fn test_observe() {
246 use std::{fs, sync::mpsc, time::Duration};
247 use tempdir::TempDir;
248
249 let dir = TempDir::new("test_observe").unwrap();
250 let path = dir.path().canonicalize().unwrap();
251 fs::write(path.join("a"), "a contents").unwrap();
252
253 let (tx, rx) = mpsc::channel();
254 let stream = EventStream::new(&[&path], Duration::from_millis(50), tx);
255 std::thread::spawn(move || stream.run());
256
257 fs::write(path.join("b"), "b contents").unwrap();
258 let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
259 let event = events.last().unwrap();
260 assert_eq!(event.path, path.join("b"));
261 assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
262
263 fs::remove_file(path.join("a")).unwrap();
264 let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
265 let event = events.last().unwrap();
266 assert_eq!(event.path, path.join("a"));
267 assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
268
269 let dir2 = TempDir::new("test_observe2").unwrap();
270 fs::rename(path, dir2.path().join("something")).unwrap();
271 let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
272}