From 6a549727ce0919c43b1b23306d21ef796506f738 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Fri, 16 Apr 2021 12:53:07 -0600 Subject: [PATCH] WIP: Lay down a skeleton for another attempt at rescan Co-Authored-By: Max Brunsfeld --- zed/src/worktree.rs | 580 +++++++++++++++++++++++++------------------- 1 file changed, 324 insertions(+), 256 deletions(-) diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 9a8752b87e3a3f200861d4bd15e9e2941c075572..81f62f2a06eaa5ba002a5f67a55170181977f3e5 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -23,10 +23,7 @@ use std::{ ops::AddAssign, os::unix::fs::MetadataExt, path::{Path, PathBuf}, - sync::{ - atomic::{self, AtomicU64}, - Arc, - }, + sync::Arc, time::Duration, }; @@ -40,16 +37,16 @@ enum ScanState { } pub struct Worktree { - id: usize, - path: Arc, - entries: SumTree, - scanner: BackgroundScanner, + snapshot: Snapshot, + scanner: Arc, scan_state: ScanState, poll_scheduled: bool, } +#[derive(Clone)] pub struct Snapshot { id: usize, + path: Arc, root_inode: Option, entries: SumTree, } @@ -62,14 +59,16 @@ pub struct FileHandle { impl Worktree { pub fn new(path: impl Into>, ctx: &mut ModelContext) -> Self { - let id = ctx.model_id(); - let path = path.into(); let scan_state = smol::channel::unbounded(); - let scanner = BackgroundScanner::new(id, path.clone(), scan_state.0); - let tree = Self { - id, - path, + let snapshot = Snapshot { + id: ctx.model_id(), + path: path.into(), + root_inode: None, entries: Default::default(), + }; + let scanner = Arc::new(BackgroundScanner::new(snapshot.clone(), scan_state.0)); + let tree = Self { + snapshot, scanner, scan_state: ScanState::Idle, poll_scheduled: false, @@ -90,7 +89,7 @@ impl Worktree { } fn poll_entries(&mut self, ctx: &mut ModelContext) { - self.entries = self.scanner.snapshot(); + self.snapshot = self.scanner.snapshot(); ctx.notify(); if self.is_scanning() && !self.poll_scheduled { @@ -112,27 +111,23 @@ impl Worktree { } pub fn snapshot(&self) -> Snapshot { - Snapshot { - id: self.id, - root_inode: self.scanner.root_inode(), - entries: self.entries.clone(), - } + self.snapshot.clone() } pub fn contains_path(&self, path: &Path) -> bool { - path.starts_with(&self.path) + path.starts_with(&self.snapshot.path) } pub fn has_inode(&self, inode: u64) -> bool { - self.entries.get(&inode).is_some() + self.snapshot.entries.get(&inode).is_some() } pub fn file_count(&self) -> usize { - self.entries.summary().file_count + self.snapshot.entries.summary().file_count } pub fn abs_path_for_inode(&self, ino: u64) -> Result { - let mut result = self.path.to_path_buf(); + let mut result = self.snapshot.path.to_path_buf(); result.push(self.path_for_inode(ino, false)?); Ok(result) } @@ -140,12 +135,13 @@ impl Worktree { pub fn path_for_inode(&self, ino: u64, include_root: bool) -> Result { let mut components = Vec::new(); let mut entry = self + .snapshot .entries .get(&ino) .ok_or_else(|| anyhow!("entry does not exist in worktree"))?; components.push(entry.name()); while let Some(parent) = entry.parent() { - entry = self.entries.get(&parent).unwrap(); + entry = self.snapshot.entries.get(&parent).unwrap(); components.push(entry.name()); } @@ -196,7 +192,7 @@ impl Worktree { } fn fmt_entry(&self, f: &mut fmt::Formatter<'_>, ino: u64, indent: usize) -> fmt::Result { - match self.entries.get(&ino).unwrap() { + match self.snapshot.entries.get(&ino).unwrap() { Entry::Dir { name, children, .. } => { write!( f, @@ -222,13 +218,16 @@ impl Worktree { #[cfg(test)] pub fn files<'a>(&'a self) -> impl Iterator + 'a { - self.entries.cursor::<(), ()>().filter_map(|entry| { - if let Entry::File { inode, .. } = entry { - Some(*inode) - } else { - None - } - }) + self.snapshot + .entries + .cursor::<(), ()>() + .filter_map(|entry| { + if let Entry::File { inode, .. } = entry { + Some(*inode) + } else { + None + } + }) } } @@ -238,7 +237,7 @@ impl Entity for Worktree { impl fmt::Debug for Worktree { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(root_ino) = self.scanner.root_inode() { + if let Some(root_ino) = self.snapshot.root_inode { self.fmt_entry(f, root_ino, 0) } else { write!(f, "Empty tree\n") @@ -273,6 +272,11 @@ impl Snapshot { Some(inode) }) } + + fn entry_for_path(&self, path: impl AsRef) -> Option<&Entry> { + self.inode_for_path(path) + .and_then(|inode| self.entries.get(&inode)) + } } impl FileHandle { @@ -392,55 +396,60 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount { } } -#[derive(Clone)] struct BackgroundScanner { - id: usize, - path: Arc, - root_ino: Arc, - entries: Arc>>, + snapshot: Mutex, notify: Sender, thread_pool: scoped_pool::Pool, } impl BackgroundScanner { - fn new(id: usize, path: Arc, notify: Sender) -> Self { + fn new(snapshot: Snapshot, notify: Sender) -> Self { Self { - id, - path, - root_ino: Arc::new(AtomicU64::new(0)), - entries: Default::default(), + snapshot: Mutex::new(snapshot), notify, thread_pool: scoped_pool::Pool::new(16), } } - fn root_inode(&self) -> Option { - let ino = self.root_ino.load(atomic::Ordering::SeqCst); - if ino == 0 { - None - } else { - Some(ino) - } + fn path(&self) -> Arc { + self.snapshot.lock().path.clone() } - fn snapshot(&self) -> SumTree { - self.entries.lock().clone() + fn snapshot(&self) -> Snapshot { + self.snapshot.lock().clone() } fn run(&self) { - let event_stream = fsevent::EventStream::new( - &[self.path.as_ref()], - Duration::from_millis(100), - |events| { - match self.process_events(events) { - Ok(alive) => alive, - Err(err) => { - // TODO: handle errors - false - } + let path = { + let mut snapshot = self.snapshot.lock(); + let canonical_path = snapshot + .path + .canonicalize() + .map(Arc::from) + .unwrap_or_else(|_| snapshot.path.clone()); + snapshot.path = canonical_path.clone(); + canonical_path + }; + + // Create the event stream before we start scanning to ensure we receive events for changes + // that occur in the middle of the scan. + let event_stream = + fsevent::EventStream::new(&[path.as_ref()], Duration::from_millis(100), |events| { + if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { + return false; } - }, - ); + + if let Err(error) = self.process_events(events) { + log::error!("error handling events: {}", error); + return false; + } + + if smol::block_on(self.notify.send(ScanState::Idle)).is_err() { + return false; + } + + true + }); if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { return; @@ -460,40 +469,38 @@ impl BackgroundScanner { } fn scan_dirs(&self) -> io::Result<()> { - let metadata = fs::metadata(&self.path)?; - let ino = metadata.ino(); - let is_symlink = fs::symlink_metadata(&self.path)?.file_type().is_symlink(); - let name = Arc::from(self.path.file_name().unwrap_or(OsStr::new("/"))); + let path = self.path(); + let metadata = fs::metadata(&path)?; + let inode = metadata.ino(); + let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink(); + let name = Arc::from(path.file_name().unwrap_or(OsStr::new("/"))); let relative_path = PathBuf::from(&name); - let mut ignore = IgnoreBuilder::new() - .build() - .add_parents(&self.path) - .unwrap(); + let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap(); if metadata.is_dir() { - ignore = ignore.add_child(&self.path).unwrap(); + ignore = ignore.add_child(&path).unwrap(); } - let is_ignored = ignore.matched(&self.path, metadata.is_dir()).is_ignore(); + let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore(); if metadata.file_type().is_dir() { let is_ignored = is_ignored || name.as_ref() == ".git"; let dir_entry = Entry::Dir { parent: None, name, - inode: ino, + inode, is_symlink, is_ignored, children: Arc::from([]), pending: true, }; self.insert_entries(Some(dir_entry.clone())); - self.root_ino.store(ino, atomic::Ordering::SeqCst); + self.snapshot.lock().root_inode = Some(inode); let (tx, rx) = crossbeam_channel::unbounded(); tx.send(Ok(ScanJob { - ino, - path: self.path.clone(), + ino: inode, + path: path.clone(), relative_path, dir_entry, ignore: Some(ignore), @@ -522,12 +529,12 @@ impl BackgroundScanner { self.insert_entries(Some(Entry::File { parent: None, name, - path: PathEntry::new(ino, &relative_path, is_ignored), - inode: ino, + path: PathEntry::new(inode, &relative_path, is_ignored), + inode, is_symlink, is_ignored, })); - self.root_ino.store(ino, atomic::Ordering::SeqCst); + self.snapshot.lock().root_inode = Some(inode); } Ok(()) @@ -620,195 +627,256 @@ impl BackgroundScanner { Ok(()) } - fn process_events(&self, mut events: Vec) -> Result { - if self.notify.receiver_count() == 0 { - return Ok(false); - } - - // TODO: should we canonicalize this at the start? - let root_path = self.path.canonicalize()?; - let snapshot = Snapshot { - id: self.id, - entries: self.entries.lock().clone(), - root_inode: self.root_inode(), - }; - let mut removed = HashSet::new(); - let mut observed = HashSet::new(); - - let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded(); + fn process_events(&self, mut events: Vec) -> Result<()> { + let snapshot = self.snapshot(); events.sort_unstable_by(|a, b| a.path.cmp(&b.path)); let mut paths = events.into_iter().map(|e| e.path).peekable(); - while let Some(path) = paths.next() { - let relative_path = path.strip_prefix(&root_path)?.to_path_buf(); - match fs::metadata(&path) { - Ok(metadata) => { - let inode = metadata.ino(); - let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink(); - let name: Arc = Arc::from(path.file_name().unwrap_or(OsStr::new("/"))); - let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap(); - if metadata.is_dir() { - ignore = ignore.add_child(&path).unwrap(); - } - let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore(); - let parent = if path == root_path { - None - } else { - Some(fs::metadata(path.parent().unwrap())?.ino()) - }; - - let prev_entry = snapshot.entries.get(&inode); - // If we haven't seen this inode yet, we are going to recursively scan it, so - // ignore event involving a descendant. - if prev_entry.is_none() { - while paths.peek().map_or(false, |p| p.starts_with(&path)) { - paths.next(); - } - } - observed.insert(inode); - if metadata.file_type().is_dir() { - let is_ignored = is_ignored || name.as_ref() == ".git"; - let dir_entry = Entry::Dir { - parent, - name, - inode, - is_symlink, - is_ignored, - children: Arc::from([]), - pending: true, - }; - self.insert_entries(Some(dir_entry.clone())); - - scan_queue_tx - .send(Ok(ScanJob { - ino: inode, - path: Arc::from(path), - relative_path, - dir_entry, - ignore: Some(ignore), - scan_queue: scan_queue_tx.clone(), - })) - .unwrap(); - } else { - self.insert_entries(Some(Entry::File { - parent, - name, - path: PathEntry::new(inode, &relative_path, is_ignored), - inode, - is_symlink, - is_ignored, - })); - } + for path in paths { + let relative_path = path.strip_prefix(&snapshot.path)?.to_path_buf(); + let snapshot_entry = snapshot.entry_for_path(relative_path); + + if let Some(fs_entry) = self.fs_entry_for_path(&path) { + if let Some(snapshot_entry) = snapshot_entry { + // If the parent does not match: + // Remove snapshot entry from its parent. + // Set its parent to none. + // Add its inode to a set of inodes to potentially remove after the batch. + + // If the parent does match, continue to next path } - Err(err) => { - if err.kind() == io::ErrorKind::NotFound { - // Fill removed with the inodes of all descendants of this path. - let mut stack = Vec::new(); - stack.extend(snapshot.inode_for_path(&relative_path)); - while let Some(inode) = stack.pop() { - removed.insert(inode); - if let Some(Entry::Dir { children, .. }) = snapshot.entries.get(&inode) - { - stack.extend(children.iter().copied()) - } - } - } else { - return Err(anyhow::Error::new(err)); - } + + // If we get here, we either had no snapshot entry at the path or we had the wrong + // entry (different inode) and removed it. + // In either case, we now need to add the entry for this path + + if let Some(existing_snapshot_entry) = snapshot.entry_for_inode(fs_entry.inode) { + // An entry already exists in the snapshot, but in the wrong spot. + // Set its parent to the correct parent + // Insert it in the children of its parent + } else { + // An entry doesn't exist in the snapshot, this is the first time we've seen it. + // If this is a directory, do a recursive scan and discard subsequent events that are contained by the current path + // Then set the parent of the result of that scan to the correct parent + // Insert it in the children of that parent. + } + } else { + if let Some(snapshot_entry) = snapshot_entry { + // Remove snapshot entry from its parent. + // Set its parent to none. + // Add its inode to a set of inodes to potentially remove after the batch. } } } - drop(scan_queue_tx); - - let mut scanned_inodes = Vec::new(); - scanned_inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new())); - self.thread_pool.scoped(|pool| { - for worker_inodes in &mut scanned_inodes { - pool.execute(|| { - let worker_inodes = worker_inodes; - while let Ok(job) = scan_queue_rx.recv() { - if let Err(err) = job.and_then(|job| { - self.scan_dir(job, Some(worker_inodes.as_mut().unwrap())) - }) { - *worker_inodes = Err(err); - break; - } - } - }); - } - }); - for worker_inodes in scanned_inodes { - for inode in worker_inodes? { - remove_counts.remove(&inode); - } - } - self.remove_entries(remove_counts); + // Check whether any entry whose parent was set to none is still an orphan. If so, remove it and all descedants. - Ok(self.notify.receiver_count() != 0) - } + *self.snapshot.lock() = snapshot; - fn insert_entries(&self, entries: impl IntoIterator) { - let mut edits = Vec::new(); - let mut new_parents = HashMap::new(); - for entry in entries { - new_parents.insert(entry.ino(), entry.parent()); - edits.push(Edit::Insert(entry)); - } + Ok(()) + } - let mut entries = self.entries.lock(); - let prev_entries = entries.edit(edits); - Self::remove_stale_children(&mut *entries, prev_entries, new_parents); - } - - fn remove_entries(&self, inodes: impl IntoIterator) { - let mut entries = self.entries.lock(); - let prev_entries = entries.edit(inodes.into_iter().map(Edit::Remove).collect()); - Self::remove_stale_children(&mut *entries, prev_entries, HashMap::new()); - } - - fn remove_stale_children( - tree: &mut SumTree, - prev_entries: Vec, - new_parents: HashMap>, - ) { - let mut new_parent_entries = HashMap::new(); - - for prev_entry in prev_entries { - let new_parent = new_parents.get(&prev_entry.ino()).copied().flatten(); - if new_parent != prev_entry.parent() { - if let Some(prev_parent) = prev_entry.parent() { - let (_, new_children) = - new_parent_entries.entry(prev_parent).or_insert_with(|| { - let prev_parent_entry = tree.get(&prev_parent).unwrap(); - if let Entry::Dir { children, .. } = prev_parent_entry { - (prev_parent_entry.clone(), children.to_vec()) - } else { - unreachable!() - } - }); + fn fs_entry_for_path(&self, path: &Path) -> Option { + todo!() + } - if let Some(ix) = new_children.iter().position(|ino| *ino == prev_entry.ino()) { - new_children.swap_remove(ix); - } - } - } - } + // fn process_events2(&self, mut events: Vec) -> Result { + // if self.notify.receiver_count() == 0 { + // return Ok(false); + // } + + // // TODO: should we canonicalize this at the start? + // let root_path = self.path.canonicalize()?; + // let snapshot = Snapshot { + // id: self.id, + // entries: self.entries.lock().clone(), + // root_inode: self.root_inode(), + // }; + // let mut removed = HashSet::new(); + // let mut observed = HashSet::new(); + + // let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded(); + + // events.sort_unstable_by(|a, b| a.path.cmp(&b.path)); + // let mut paths = events.into_iter().map(|e| e.path).peekable(); + // while let Some(path) = paths.next() { + // let relative_path = path.strip_prefix(&root_path)?.to_path_buf(); + // match fs::metadata(&path) { + // Ok(metadata) => { + // let inode = metadata.ino(); + // let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink(); + // let name: Arc = Arc::from(path.file_name().unwrap_or(OsStr::new("/"))); + // let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap(); + // if metadata.is_dir() { + // ignore = ignore.add_child(&path).unwrap(); + // } + // let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore(); + // let parent = if path == root_path { + // None + // } else { + // Some(fs::metadata(path.parent().unwrap())?.ino()) + // }; + + // let prev_entry = snapshot.entries.get(&inode); + // // If we haven't seen this inode yet, we are going to recursively scan it, so + // // ignore event involving a descendant. + // if prev_entry.is_none() { + // while paths.peek().map_or(false, |p| p.starts_with(&path)) { + // paths.next(); + // } + // } + + // observed.insert(inode); + // if metadata.file_type().is_dir() { + // let is_ignored = is_ignored || name.as_ref() == ".git"; + // let dir_entry = Entry::Dir { + // parent, + // name, + // inode, + // is_symlink, + // is_ignored, + // children: Arc::from([]), + // pending: true, + // }; + // self.insert_entries(Some(dir_entry.clone())); + + // scan_queue_tx + // .send(Ok(ScanJob { + // ino: inode, + // path: Arc::from(path), + // relative_path, + // dir_entry, + // ignore: Some(ignore), + // scan_queue: scan_queue_tx.clone(), + // })) + // .unwrap(); + // } else { + // self.insert_entries(Some(Entry::File { + // parent, + // name, + // path: PathEntry::new(inode, &relative_path, is_ignored), + // inode, + // is_symlink, + // is_ignored, + // })); + // } + // } + // Err(err) => { + // if err.kind() == io::ErrorKind::NotFound { + // // Fill removed with the inodes of all descendants of this path. + // let mut stack = Vec::new(); + // stack.extend(snapshot.inode_for_path(&relative_path)); + // while let Some(inode) = stack.pop() { + // removed.insert(inode); + // if let Some(Entry::Dir { children, .. }) = snapshot.entries.get(&inode) + // { + // stack.extend(children.iter().copied()) + // } + // } + // } else { + // return Err(anyhow::Error::new(err)); + // } + // } + // } + // } + // drop(scan_queue_tx); + + // let mut scanned_inodes = Vec::new(); + // scanned_inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new())); + // self.thread_pool.scoped(|pool| { + // for worker_inodes in &mut scanned_inodes { + // pool.execute(|| { + // let worker_inodes = worker_inodes; + // while let Ok(job) = scan_queue_rx.recv() { + // if let Err(err) = job.and_then(|job| { + // self.scan_dir(job, Some(worker_inodes.as_mut().unwrap())) + // }) { + // *worker_inodes = Err(err); + // break; + // } + // } + // }); + // } + // }); + + // for worker_inodes in scanned_inodes { + // for inode in worker_inodes? { + // remove_counts.remove(&inode); + // } + // } + // self.remove_entries(remove_counts); + + // Ok(self.notify.receiver_count() != 0) + // } - let parent_edits = new_parent_entries - .into_iter() - .map(|(_, (mut parent_entry, new_children))| { - if let Entry::Dir { children, .. } = &mut parent_entry { - *children = Arc::from(new_children); - } else { - unreachable!() - } - Edit::Insert(parent_entry) - }) - .collect::>(); - tree.edit(parent_edits); + fn insert_entries(&self, entries: impl IntoIterator) { + self.snapshot + .lock() + .entries + .edit(entries.into_iter().map(Edit::Insert).collect::>()); } + + // fn insert_entries(&self, entries: impl IntoIterator) { + // let mut edits = Vec::new(); + // let mut new_parents = HashMap::new(); + // for entry in entries { + // new_parents.insert(entry.ino(), entry.parent()); + // edits.push(Edit::Insert(entry)); + // } + + // let mut entries = self.snapshot.lock().entries; + // let prev_entries = entries.edit(edits); + // Self::remove_stale_children(&mut *entries, prev_entries, new_parents); + // } + + // fn remove_entries(&self, inodes: impl IntoIterator) { + // let mut entries = self.entries.lock(); + // let prev_entries = entries.edit(inodes.into_iter().map(Edit::Remove).collect()); + // Self::remove_stale_children(&mut *entries, prev_entries, HashMap::new()); + // } + + // fn remove_stale_children( + // tree: &mut SumTree, + // prev_entries: Vec, + // new_parents: HashMap>, + // ) { + // let mut new_parent_entries = HashMap::new(); + + // for prev_entry in prev_entries { + // let new_parent = new_parents.get(&prev_entry.ino()).copied().flatten(); + // if new_parent != prev_entry.parent() { + // if let Some(prev_parent) = prev_entry.parent() { + // let (_, new_children) = + // new_parent_entries.entry(prev_parent).or_insert_with(|| { + // let prev_parent_entry = tree.get(&prev_parent).unwrap(); + // if let Entry::Dir { children, .. } = prev_parent_entry { + // (prev_parent_entry.clone(), children.to_vec()) + // } else { + // unreachable!() + // } + // }); + + // if let Some(ix) = new_children.iter().position(|ino| *ino == prev_entry.ino()) { + // new_children.swap_remove(ix); + // } + // } + // } + // } + + // let parent_edits = new_parent_entries + // .into_iter() + // .map(|(_, (mut parent_entry, new_children))| { + // if let Entry::Dir { children, .. } = &mut parent_entry { + // *children = Arc::from(new_children); + // } else { + // unreachable!() + // } + // Edit::Insert(parent_entry) + // }) + // .collect::>(); + // tree.edit(parent_edits); + // } } struct ScanJob {