Detailed changes
@@ -448,12 +448,6 @@ dependencies = [
"cfg-if 1.0.0",
]
-[[package]]
-name = "crossbeam"
-version = "0.2.12"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be"
-
[[package]]
name = "crossbeam-channel"
version = "0.4.4"
@@ -759,6 +753,7 @@ version = "2.0.2"
dependencies = [
"bitflags",
"fsevent-sys",
+ "parking_lot",
"tempdir",
]
@@ -1062,7 +1057,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
- "scopeguard 1.1.0",
+ "scopeguard",
]
[[package]]
@@ -1714,13 +1709,9 @@ dependencies = [
[[package]]
name = "scoped-pool"
-version = "1.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "817a3a15e704545ce59ed2b5c60a5d32bda4d7869befb8b36667b658a6c00b43"
+version = "0.0.1"
dependencies = [
- "crossbeam",
- "scopeguard 0.1.2",
- "variance",
+ "crossbeam-channel 0.5.0",
]
[[package]]
@@ -1729,12 +1720,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
-[[package]]
-name = "scopeguard"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "59a076157c1e2dc561d8de585151ee6965d910dd4dcb5dabb7ae3e83981a6c57"
-
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -2138,12 +2123,6 @@ dependencies = [
"xmlwriter",
]
-[[package]]
-name = "variance"
-version = "0.1.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3abfc2be1fb59663871379ea884fd81de80c496f2274e021c01d6fe56cd77b05"
-
[[package]]
name = "vec-arena"
version = "1.0.0"
@@ -1,5 +1,5 @@
[workspace]
-members = ["zed", "gpui", "fsevent"]
+members = ["zed", "gpui", "fsevent", "scoped_pool"]
[patch.crates-io]
async-task = {git = "https://github.com/zed-industries/async-task", rev = "341b57d6de98cdfd7b418567b8de2022ca993a6e"}
@@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
bitflags = "1"
fsevent-sys = "3.0.2"
+parking_lot = "0.11.1"
[dev-dependencies]
tempdir = "0.3.7"
@@ -5,12 +5,12 @@ fn main() {
let paths = args().skip(1).collect::<Vec<_>>();
let paths = paths.iter().map(Path::new).collect::<Vec<_>>();
assert!(paths.len() > 0, "Must pass 1 or more paths as arguments");
- let stream = EventStream::new(&paths, Duration::from_millis(100), |events| {
+ let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100));
+ stream.run(|events| {
eprintln!("event batch");
for event in events {
eprintln!(" {:?}", event);
}
true
});
- stream.run();
}
@@ -2,12 +2,14 @@
use bitflags::bitflags;
use fsevent_sys::{self as fs, core_foundation as cf};
+use parking_lot::Mutex;
use std::{
convert::AsRef,
ffi::{c_void, CStr, OsStr},
os::unix::ffi::OsStrExt,
path::{Path, PathBuf},
slice,
+ sync::Arc,
time::Duration,
};
@@ -18,20 +20,29 @@ pub struct Event {
pub path: PathBuf,
}
-pub struct EventStream<F> {
+pub struct EventStream {
stream: fs::FSEventStreamRef,
- _callback: Box<F>,
+ state: Arc<Mutex<Lifecycle>>,
+ callback: Box<Option<RunCallback>>,
}
-unsafe impl<F> Send for EventStream<F> {}
+type RunCallback = Box<dyn FnMut(Vec<Event>) -> bool>;
-impl<F> EventStream<F>
-where
- F: FnMut(Vec<Event>) -> bool,
-{
- pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self {
+enum Lifecycle {
+ New,
+ Running(cf::CFRunLoopRef),
+ Stopped,
+}
+
+pub struct Handle(Arc<Mutex<Lifecycle>>);
+
+unsafe impl Send for EventStream {}
+unsafe impl Send for Lifecycle {}
+
+impl EventStream {
+ pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
unsafe {
- let callback = Box::new(callback);
+ let callback = Box::new(None);
let stream_context = fs::FSEventStreamContext {
version: 0,
info: callback.as_ref() as *const _ as *mut c_void,
@@ -71,20 +82,35 @@ where
);
cf::CFRelease(cf_paths);
- EventStream {
- stream,
- _callback: callback,
- }
+ let state = Arc::new(Mutex::new(Lifecycle::New));
+
+ (
+ EventStream {
+ stream,
+ state: state.clone(),
+ callback,
+ },
+ Handle(state),
+ )
}
}
- pub fn run(self) {
+ pub fn run<F>(mut self, f: F)
+ where
+ F: FnMut(Vec<Event>) -> bool + 'static,
+ {
+ *self.callback = Some(Box::new(f));
unsafe {
- fs::FSEventStreamScheduleWithRunLoop(
- self.stream,
- cf::CFRunLoopGetCurrent(),
- cf::kCFRunLoopDefaultMode,
- );
+ let run_loop = cf::CFRunLoopGetCurrent();
+ {
+ let mut state = self.state.lock();
+ match *state {
+ Lifecycle::New => *state = Lifecycle::Running(run_loop),
+ Lifecycle::Running(_) => unreachable!(),
+ Lifecycle::Stopped => return,
+ }
+ }
+ fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode);
fs::FSEventStreamStart(self.stream);
cf::CFRunLoopRun();
@@ -107,7 +133,11 @@ where
let event_paths = event_paths as *const *const ::std::os::raw::c_char;
let e_ptr = event_flags as *mut u32;
let i_ptr = event_ids as *mut u64;
- let callback = (info as *mut F).as_mut().unwrap();
+ let callback = (info as *mut Option<RunCallback>)
+ .as_mut()
+ .unwrap()
+ .as_mut()
+ .unwrap();
let paths = slice::from_raw_parts(event_paths, num);
let flags = slice::from_raw_parts_mut(e_ptr, num);
@@ -136,6 +166,18 @@ where
}
}
+impl Drop for Handle {
+ fn drop(&mut self) {
+ let mut state = self.0.lock();
+ if let Lifecycle::Running(run_loop) = *state {
+ unsafe {
+ cf::CFRunLoopStop(run_loop);
+ }
+ }
+ *state = Lifecycle::Stopped;
+ }
+}
+
// Synchronize with
// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
bitflags! {
@@ -253,10 +295,8 @@ fn test_event_stream() {
fs::write(path.join("a"), "a contents").unwrap();
let (tx, rx) = mpsc::channel();
- let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| {
- tx.send(events.to_vec()).is_ok()
- });
- std::thread::spawn(move || stream.run());
+ let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+ std::thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
fs::write(path.join("b"), "b contents").unwrap();
let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
@@ -269,4 +309,46 @@ fn test_event_stream() {
let event = events.last().unwrap();
assert_eq!(event.path, path.join("a"));
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
+ drop(handle);
+}
+
+#[test]
+fn test_event_stream_shutdown() {
+ use std::{fs, sync::mpsc, time::Duration};
+ use tempdir::TempDir;
+
+ let dir = TempDir::new("test_observe").unwrap();
+ let path = dir.path().canonicalize().unwrap();
+
+ let (tx, rx) = mpsc::channel();
+ let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+ std::thread::spawn(move || {
+ stream.run({
+ let tx = tx.clone();
+ move |_| {
+ tx.send(()).unwrap();
+ true
+ }
+ });
+ tx.send(()).unwrap();
+ });
+
+ fs::write(path.join("b"), "b contents").unwrap();
+ rx.recv_timeout(Duration::from_millis(500)).unwrap();
+
+ drop(handle);
+ rx.recv_timeout(Duration::from_millis(500)).unwrap();
+}
+
+#[test]
+fn test_event_stream_shutdown_before_run() {
+ use std::time::Duration;
+ use tempdir::TempDir;
+
+ let dir = TempDir::new("test_observe").unwrap();
+ let path = dir.path().canonicalize().unwrap();
+
+ let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
+ drop(handle);
+ stream.run(|_| true);
}
@@ -18,7 +18,7 @@ postage = {version = "0.4.1", features = ["futures-traits"]}
rand = "0.8.3"
replace_with = "0.1.7"
resvg = "0.14"
-scoped-pool = "1.0.0"
+scoped-pool = {path = "../scoped_pool"}
seahash = "4.1"
serde = {version = "1.0.125", features = ["derive"]}
serde_json = "1.0.64"
@@ -411,7 +411,7 @@ impl MutableAppContext {
windows: HashMap::new(),
ref_counts: Arc::new(Mutex::new(RefCounts::default())),
background: Arc::new(executor::Background::new()),
- thread_pool: scoped_pool::Pool::new(num_cpus::get()),
+ thread_pool: scoped_pool::Pool::new(num_cpus::get(), "app"),
},
actions: HashMap::new(),
global_actions: HashMap::new(),
@@ -0,0 +1,8 @@
+[package]
+name = "scoped-pool"
+version = "0.0.1"
+license = "MIT"
+edition = "2018"
+
+[dependencies]
+crossbeam-channel = "0.5"
@@ -0,0 +1,188 @@
+use crossbeam_channel as chan;
+use std::{marker::PhantomData, mem::transmute, thread};
+
+#[derive(Clone)]
+pub struct Pool {
+ req_tx: chan::Sender<Request>,
+ thread_count: usize,
+}
+
+pub struct Scope<'a> {
+ req_count: usize,
+ req_tx: chan::Sender<Request>,
+ resp_tx: chan::Sender<()>,
+ resp_rx: chan::Receiver<()>,
+ phantom: PhantomData<&'a ()>,
+}
+
+struct Request {
+ callback: Box<dyn FnOnce() + Send + 'static>,
+ resp_tx: chan::Sender<()>,
+}
+
+impl Pool {
+ pub fn new(thread_count: usize, name: impl AsRef<str>) -> Self {
+ let (req_tx, req_rx) = chan::unbounded();
+ for i in 0..thread_count {
+ thread::Builder::new()
+ .name(format!("scoped_pool {} {}", name.as_ref(), i))
+ .spawn({
+ let req_rx = req_rx.clone();
+ move || loop {
+ match req_rx.recv() {
+ Err(_) => break,
+ Ok(Request { callback, resp_tx }) => {
+ callback();
+ resp_tx.send(()).ok();
+ }
+ }
+ }
+ })
+ .expect("scoped_pool: failed to spawn thread");
+ }
+ Self {
+ req_tx,
+ thread_count,
+ }
+ }
+
+ pub fn thread_count(&self) -> usize {
+ self.thread_count
+ }
+
+ pub fn scoped<'scope, F, R>(&self, scheduler: F) -> R
+ where
+ F: FnOnce(&mut Scope<'scope>) -> R,
+ {
+ let (resp_tx, resp_rx) = chan::bounded(1);
+ let mut scope = Scope {
+ resp_tx,
+ resp_rx,
+ req_count: 0,
+ phantom: PhantomData,
+ req_tx: self.req_tx.clone(),
+ };
+ let result = scheduler(&mut scope);
+ scope.wait();
+ result
+ }
+}
+
+impl<'scope> Scope<'scope> {
+ pub fn execute<F>(&mut self, callback: F)
+ where
+ F: FnOnce() + Send + 'scope,
+ {
+ // Transmute the callback's lifetime to be 'static. This is safe because in ::wait,
+ // we block until all the callbacks have been called and dropped.
+ let callback = unsafe {
+ transmute::<Box<dyn FnOnce() + Send + 'scope>, Box<dyn FnOnce() + Send + 'static>>(
+ Box::new(callback),
+ )
+ };
+
+ self.req_count += 1;
+ self.req_tx
+ .send(Request {
+ callback,
+ resp_tx: self.resp_tx.clone(),
+ })
+ .unwrap();
+ }
+
+ fn wait(&self) {
+ for _ in 0..self.req_count {
+ self.resp_rx.recv().unwrap();
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::{Arc, Mutex};
+
+ #[test]
+ fn test_execute() {
+ let pool = Pool::new(3, "test");
+
+ {
+ let vec = Mutex::new(Vec::new());
+ pool.scoped(|scope| {
+ for _ in 0..3 {
+ scope.execute(|| {
+ for i in 0..5 {
+ vec.lock().unwrap().push(i);
+ }
+ });
+ }
+ });
+
+ let mut vec = vec.into_inner().unwrap();
+ vec.sort_unstable();
+ assert_eq!(vec, [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
+ }
+ }
+
+ #[test]
+ fn test_clone_send_and_execute() {
+ let pool = Pool::new(3, "test");
+
+ let mut threads = Vec::new();
+ for _ in 0..3 {
+ threads.push(thread::spawn({
+ let pool = pool.clone();
+ move || {
+ let vec = Mutex::new(Vec::new());
+ pool.scoped(|scope| {
+ for _ in 0..3 {
+ scope.execute(|| {
+ for i in 0..5 {
+ vec.lock().unwrap().push(i);
+ }
+ });
+ }
+ });
+ let mut vec = vec.into_inner().unwrap();
+ vec.sort_unstable();
+ assert_eq!(vec, [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
+ }
+ }));
+ }
+
+ for thread in threads {
+ thread.join().unwrap();
+ }
+ }
+
+ #[test]
+ fn test_share_and_execute() {
+ let pool = Arc::new(Pool::new(3, "test"));
+
+ let mut threads = Vec::new();
+ for _ in 0..3 {
+ threads.push(thread::spawn({
+ let pool = pool.clone();
+ move || {
+ let vec = Mutex::new(Vec::new());
+ pool.scoped(|scope| {
+ for _ in 0..3 {
+ scope.execute(|| {
+ for i in 0..5 {
+ vec.lock().unwrap().push(i);
+ }
+ });
+ }
+ });
+ let mut vec = vec.into_inner().unwrap();
+ vec.sort_unstable();
+ assert_eq!(vec, [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
+ }
+ }));
+ }
+
+ for thread in threads {
+ thread.join().unwrap();
+ }
+ }
+}
@@ -37,8 +37,9 @@ enum ScanState {
pub struct Worktree {
snapshot: Snapshot,
- scanner: Arc<BackgroundScanner>,
+ background_snapshot: Arc<Mutex<Snapshot>>,
scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
+ _event_stream_handle: fsevent::Handle,
poll_scheduled: bool,
}
@@ -50,25 +51,33 @@ pub struct FileHandle {
impl Worktree {
pub fn new(path: impl Into<Arc<Path>>, ctx: &mut ModelContext<Self>) -> Self {
- let scan_state = smol::channel::unbounded();
+ let (scan_state_tx, scan_state_rx) = smol::channel::unbounded();
+ let id = ctx.model_id();
let snapshot = Snapshot {
- id: ctx.model_id(),
+ id,
path: path.into(),
root_inode: None,
entries: Default::default(),
};
- let scanner = Arc::new(BackgroundScanner::new(snapshot.clone(), scan_state.0));
+ let (event_stream, event_stream_handle) =
+ fsevent::EventStream::new(&[snapshot.path.as_ref()], Duration::from_millis(100));
+
+ let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
+
let tree = Self {
snapshot,
- scanner,
+ background_snapshot: background_snapshot.clone(),
scan_state: watch::channel_with(ScanState::Scanning),
+ _event_stream_handle: event_stream_handle,
poll_scheduled: false,
};
- let scanner = tree.scanner.clone();
- std::thread::spawn(move || scanner.run());
+ std::thread::spawn(move || {
+ let scanner = BackgroundScanner::new(background_snapshot, scan_state_tx, id);
+ scanner.run(event_stream)
+ });
- ctx.spawn_stream(scan_state.1, Self::observe_scan_state, |_, _| {})
+ ctx.spawn_stream(scan_state_rx, Self::observe_scan_state, |_, _| {})
.detach();
tree
@@ -90,7 +99,7 @@ impl Worktree {
}
fn poll_entries(&mut self, ctx: &mut ModelContext<Self>) {
- self.snapshot = self.scanner.snapshot();
+ self.snapshot = self.background_snapshot.lock().clone();
ctx.notify();
if self.is_scanning() && !self.poll_scheduled {
@@ -490,17 +499,17 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
}
struct BackgroundScanner {
- snapshot: Mutex<Snapshot>,
+ snapshot: Arc<Mutex<Snapshot>>,
notify: Sender<ScanState>,
thread_pool: scoped_pool::Pool,
}
impl BackgroundScanner {
- fn new(snapshot: Snapshot, notify: Sender<ScanState>) -> Self {
+ fn new(snapshot: Arc<Mutex<Snapshot>>, notify: Sender<ScanState>, worktree_id: usize) -> Self {
Self {
- snapshot: Mutex::new(snapshot),
+ snapshot,
notify,
- thread_pool: scoped_pool::Pool::new(16),
+ thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)),
}
}
@@ -512,28 +521,7 @@ impl BackgroundScanner {
self.snapshot.lock().clone()
}
- fn run(&self) {
- let path = self.snapshot.lock().path.clone();
-
- // Create the event stream before we start scanning to ensure we receive events for changes
- // that occur in the middle of the scan.
- let event_stream =
- fsevent::EventStream::new(&[path.as_ref()], Duration::from_millis(100), |events| {
- if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
- return false;
- }
-
- if !self.process_events(events) {
- return false;
- }
-
- if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
- return false;
- }
-
- true
- });
-
+ fn run(self, event_stream: fsevent::EventStream) {
if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
return;
}
@@ -548,7 +536,21 @@ impl BackgroundScanner {
return;
}
- event_stream.run();
+ event_stream.run(move |events| {
+ if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
+ return false;
+ }
+
+ if !self.process_events(events) {
+ return false;
+ }
+
+ if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
+ return false;
+ }
+
+ true
+ });
}
fn scan_dirs(&self) -> io::Result<()> {
@@ -592,7 +594,7 @@ impl BackgroundScanner {
drop(tx);
let mut results = Vec::new();
- results.resize_with(self.thread_pool.workers(), || Ok(()));
+ results.resize_with(self.thread_pool.thread_count(), || Ok(()));
self.thread_pool.scoped(|pool| {
for result in &mut results {
pool.execute(|| {
@@ -762,7 +764,7 @@ impl BackgroundScanner {
// Scan any directories that were created as part of this event batch.
drop(scan_queue_tx);
self.thread_pool.scoped(|pool| {
- for _ in 0..self.thread_pool.workers() {
+ for _ in 0..self.thread_pool.thread_count() {
pool.execute(|| {
while let Ok(job) = scan_queue_rx.recv() {
if let Err(err) = job.and_then(|job| self.scan_dir(job)) {
@@ -844,12 +846,6 @@ impl BackgroundScanner {
}
}
-impl Drop for BackgroundScanner {
- fn drop(&mut self) {
- self.thread_pool.shutdown();
- }
-}
-
struct ScanJob {
inode: u64,
path: Arc<Path>,
@@ -951,6 +947,8 @@ mod tests {
);
})
});
+
+ eprintln!("HI");
}
#[test]
@@ -1051,13 +1049,14 @@ mod tests {
let (notify_tx, _notify_rx) = smol::channel::unbounded();
let scanner = BackgroundScanner::new(
- Snapshot {
+ Arc::new(Mutex::new(Snapshot {
id: 0,
path: root_dir.path().into(),
root_inode: None,
entries: Default::default(),
- },
+ })),
notify_tx,
+ 0,
);
scanner.scan_dirs().unwrap();
@@ -1079,13 +1078,14 @@ mod tests {
let (notify_tx, _notify_rx) = smol::channel::unbounded();
let new_scanner = BackgroundScanner::new(
- Snapshot {
+ Arc::new(Mutex::new(Snapshot {
id: 0,
path: root_dir.path().into(),
root_inode: None,
entries: Default::default(),
- },
+ })),
notify_tx,
+ 1,
);
new_scanner.scan_dirs().unwrap();
assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());