Start filling out new structure for processing fs events

Max Brunsfeld created

Change summary

zed/src/worktree.rs | 495 ++++++++++++++++++++++++----------------------
1 file changed, 261 insertions(+), 234 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -12,10 +12,7 @@ use gpui::{scoped_pool, AppContext, Entity, ModelContext, ModelHandle, Task};
 use ignore::dir::{Ignore, IgnoreBuilder};
 use parking_lot::Mutex;
 use smol::{channel::Sender, Timer};
-use std::{
-    collections::{HashMap, HashSet},
-    future::Future,
-};
+use std::{collections::HashSet, future::Future};
 use std::{
     ffi::OsStr,
     fmt, fs,
@@ -178,7 +175,6 @@ impl Worktree {
         ctx: &AppContext,
     ) -> Task<Result<()>> {
         let path = self.abs_path_for_inode(ino);
-        eprintln!("save to path: {:?}", path);
         ctx.background_executor().spawn(async move {
             let buffer_size = content.text_summary().bytes.min(10 * 1024);
             let file = std::fs::File::create(&path?)?;
@@ -277,6 +273,80 @@ impl Snapshot {
         self.inode_for_path(path)
             .and_then(|inode| self.entries.get(&inode))
     }
+
+    fn reparent_entry(
+        &mut self,
+        child_inode: u64,
+        old_parent_inode: Option<u64>,
+        new_parent_inode: Option<u64>,
+    ) {
+        let mut edits_len = 1;
+        if old_parent_inode.is_some() {
+            edits_len += 1;
+        }
+        if new_parent_inode.is_some() {
+            edits_len += 1;
+        }
+        let mut deletions = Vec::with_capacity(edits_len);
+        let mut insertions = Vec::with_capacity(edits_len);
+
+        // Remove the entries from the sum tree.
+        deletions.push(Edit::Remove(child_inode));
+        if let Some(old_parent_inode) = old_parent_inode {
+            deletions.push(Edit::Remove(old_parent_inode));
+        }
+        if let Some(new_parent_inode) = new_parent_inode {
+            deletions.push(Edit::Remove(new_parent_inode));
+        }
+        let removed_entries = self.entries.edit(deletions);
+        let mut child_entry = None;
+        let mut old_parent_entry = None;
+        let mut new_parent_entry = None;
+        for removed_entry in removed_entries {
+            if removed_entry.ino() == child_inode {
+                child_entry = Some(removed_entry);
+            } else if Some(removed_entry.ino()) == old_parent_inode {
+                old_parent_entry = Some(removed_entry);
+            } else if Some(removed_entry.ino()) == new_parent_inode {
+                new_parent_entry = Some(removed_entry);
+            }
+        }
+
+        // Update the child entry's parent.
+        let mut child_entry = child_entry.expect("cannot reparent non-existent entry");
+        child_entry.set_parent(new_parent_inode);
+        insertions.push(Edit::Insert(child_entry));
+
+        // Remove the child entry from it's old parent's children.
+        if let Some(mut old_parent_entry) = old_parent_entry {
+            if let Entry::Dir { children, .. } = &mut old_parent_entry {
+                *children = children
+                    .into_iter()
+                    .cloned()
+                    .filter(|c| *c != child_inode)
+                    .collect();
+                insertions.push(Edit::Insert(old_parent_entry));
+            } else {
+                panic!("snapshot entry's new parent was not a directory");
+            }
+        }
+
+        // Add the child entry to it's new parent's children.
+        if let Some(mut new_parent_entry) = new_parent_entry {
+            if let Entry::Dir { children, .. } = &mut new_parent_entry {
+                *children = children
+                    .into_iter()
+                    .cloned()
+                    .chain(Some(child_inode))
+                    .collect();
+                insertions.push(Edit::Insert(new_parent_entry));
+            } else {
+                panic!("snapshot entry's new parent is not a directory");
+            }
+        }
+
+        self.entries.edit(insertions);
+    }
 }
 
 impl FileHandle {
@@ -337,6 +407,13 @@ impl Entry {
         }
     }
 
+    fn set_parent(&mut self, new_parent: Option<u64>) {
+        match self {
+            Entry::Dir { parent, .. } => *parent = new_parent,
+            Entry::File { parent, .. } => *parent = new_parent,
+        }
+    }
+
     fn name(&self) -> &OsStr {
         match self {
             Entry::Dir { name, .. } => name,
@@ -439,10 +516,7 @@ impl BackgroundScanner {
                     return false;
                 }
 
-                if let Err(error) = self.process_events(events) {
-                    log::error!("error handling events: {}", error);
-                    return false;
-                }
+                self.process_events(events);
 
                 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
                     return false;
@@ -627,188 +701,202 @@ impl BackgroundScanner {
         Ok(())
     }
 
-    fn process_events(&self, mut events: Vec<fsevent::Event>) -> Result<()> {
-        let snapshot = self.snapshot();
+    fn process_events(&self, mut events: Vec<fsevent::Event>) {
+        let mut snapshot = self.snapshot();
 
         events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
         let mut paths = events.into_iter().map(|e| e.path).peekable();
+        let mut possible_removed_inodes = HashSet::new();
+        let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
+
+        while let Some(path) = paths.next() {
+            let relative_path = match path.strip_prefix(&snapshot.path) {
+                Ok(relative_path) => relative_path.to_path_buf(),
+                Err(e) => {
+                    log::error!("Unexpected event {:?}", e);
+                    continue;
+                }
+            };
 
-        for path in paths {
-            let relative_path = path.strip_prefix(&snapshot.path)?.to_path_buf();
-            let snapshot_entry = snapshot.entry_for_path(relative_path);
+            let snapshot_entry = snapshot.entry_for_path(&relative_path);
+            let fs_entry = self.fs_entry_for_path(&snapshot.path, &path);
+
+            match fs_entry {
+                // If this path currently exists on the filesystem, then ensure that the snapshot's
+                // entry for this path is up-to-date.
+                Ok(Some((fs_entry, ignore))) => {
+                    let fs_inode = fs_entry.ino();
+                    let fs_parent_inode = fs_entry.parent();
+
+                    // If the snapshot already contains an entry for this path, then ensure that the
+                    // entry has the correct inode and parent.
+                    if let Some(snapshot_entry) = snapshot_entry {
+                        let snapshot_inode = snapshot_entry.ino();
+                        let snapshot_parent_inode = snapshot_entry.parent();
+
+                        // If the snapshot entry already matches the filesystem, then skip to the
+                        // next event path.
+                        if snapshot_inode == fs_inode && snapshot_parent_inode == fs_parent_inode {
+                            continue;
+                        }
 
-            if let Some(fs_entry) = self.fs_entry_for_path(&path) {
-                if let Some(snapshot_entry) = snapshot_entry {
-                    // If the parent does not match:
-                    // Remove snapshot entry from its parent.
-                    // Set its parent to none.
-                    // Add its inode to a set of inodes to potentially remove after the batch.
+                        // If it does not match, then detach this inode from its current parent, and
+                        // record that it may have been removed from the worktree.
+                        snapshot.reparent_entry(snapshot_inode, snapshot_parent_inode, None);
+                        possible_removed_inodes.insert(snapshot_inode);
+                    }
 
-                    // If the parent does match, continue to next path
+                    // If the snapshot already contained an entry for the inode that is now located
+                    // at this path in the filesystem, then move it to reflect its current parent on
+                    // the filesystem.
+                    if let Some(snapshot_entry_for_inode) = snapshot.entries.get(&fs_inode) {
+                        let snapshot_parent_inode = snapshot_entry_for_inode.parent();
+                        snapshot.reparent_entry(fs_inode, snapshot_parent_inode, fs_parent_inode);
+                    }
+                    // If the snapshot has no entry for this inode, then scan the filesystem to find
+                    // all descendents of this new inode. Discard any subsequent events that are
+                    // contained by the current path, since the directory is already being scanned
+                    // from scratch.
+                    else {
+                        while let Some(next_path) = paths.peek() {
+                            if next_path.starts_with(&path) {
+                                paths.next();
+                            }
+                        }
+                        scan_queue_tx
+                            .send(Ok(ScanJob {
+                                ino: fs_inode,
+                                path: Arc::from(path),
+                                relative_path,
+                                dir_entry: fs_entry,
+                                ignore: Some(ignore),
+                                scan_queue: scan_queue_tx.clone(),
+                            }))
+                            .unwrap();
+                    }
                 }
 
-                // If we get here, we either had no snapshot entry at the path or we had the wrong
-                // entry (different inode) and removed it.
-                // In either case, we now need to add the entry for this path
-
-                if let Some(existing_snapshot_entry) = snapshot.entry_for_inode(fs_entry.inode) {
-                    // An entry already exists in the snapshot, but in the wrong spot.
-                    // Set its parent to the correct parent
-                    // Insert it in the children of its parent
-                } else {
-                    // An entry doesn't exist in the snapshot, this is the first time we've seen it.
-                    // If this is a directory, do a recursive scan and discard subsequent events that are contained by the current path
-                    // Then set the parent of the result of that scan to the correct parent
-                    // Insert it in the children of that parent.
+                // If this path no longer exists on the filesystem, then remove it from the snapshot.
+                Ok(None) => {
+                    if let Some(snapshot_entry) = snapshot_entry {
+                        let snapshot_inode = snapshot_entry.ino();
+                        let snapshot_parent_inode = snapshot_entry.parent();
+                        snapshot.reparent_entry(snapshot_inode, snapshot_parent_inode, None);
+                        possible_removed_inodes.insert(snapshot_inode);
+                    }
                 }
-            } else {
-                if let Some(snapshot_entry) = snapshot_entry {
-                    // Remove snapshot entry from its parent.
-                    // Set its parent to none.
-                    // Add its inode to a set of inodes to potentially remove after the batch.
+                Err(e) => {
+                    // TODO - create a special 'error' entry in the entries tree to mark this
+                    log::error!("Error reading file on event {:?}", e);
                 }
             }
         }
 
-        // Check whether any entry whose parent was set to none is still an orphan. If so, remove it and all descedants.
-
+        // For now, update the locked snapshot at this point, because `scan_dir` uses that.
         *self.snapshot.lock() = snapshot;
 
-        Ok(())
-    }
+        // Scan any directories that were moved into this worktree as part of this event batch.
+        drop(scan_queue_tx);
+        let mut scanned_inodes = Vec::new();
+        scanned_inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new()));
+        self.thread_pool.scoped(|pool| {
+            for worker_inodes in &mut scanned_inodes {
+                pool.execute(|| {
+                    let worker_inodes = worker_inodes;
+                    while let Ok(job) = scan_queue_rx.recv() {
+                        if let Err(err) = job.and_then(|job| {
+                            self.scan_dir(job, Some(worker_inodes.as_mut().unwrap()))
+                        }) {
+                            *worker_inodes = Err(err);
+                            break;
+                        }
+                    }
+                });
+            }
+        });
+
+        // Remove any entries that became orphaned when processing this events batch.
+        let mut snapshot = self.snapshot();
+        let mut deletions = Vec::new();
+        let mut descendent_stack = Vec::new();
+        for inode in possible_removed_inodes {
+            if let Some(entry) = snapshot.entries.get(&inode) {
+                if entry.parent().is_none() {
+                    descendent_stack.push(inode);
+                }
+            }
 
-    fn fs_entry_for_path(&self, path: &Path) -> Option<Entry> {
-        todo!()
+            // Recursively remove the orphaned nodes' descendants.
+            while let Some(inode) = descendent_stack.pop() {
+                if let Some(entry) = snapshot.entries.get(&inode) {
+                    deletions.push(Edit::Remove(inode));
+                    if let Entry::Dir { children, .. } = entry {
+                        descendent_stack.extend_from_slice(children.as_ref());
+                    }
+                }
+            }
+        }
+        snapshot.entries.edit(deletions);
+        *self.snapshot.lock() = snapshot;
     }
 
-    // fn process_events2(&self, mut events: Vec<fsevent::Event>) -> Result<bool> {
-    //     if self.notify.receiver_count() == 0 {
-    //         return Ok(false);
-    //     }
-
-    //     // TODO: should we canonicalize this at the start?
-    //     let root_path = self.path.canonicalize()?;
-    //     let snapshot = Snapshot {
-    //         id: self.id,
-    //         entries: self.entries.lock().clone(),
-    //         root_inode: self.root_inode(),
-    //     };
-    //     let mut removed = HashSet::new();
-    //     let mut observed = HashSet::new();
-
-    //     let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
-
-    //     events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
-    //     let mut paths = events.into_iter().map(|e| e.path).peekable();
-    //     while let Some(path) = paths.next() {
-    //         let relative_path = path.strip_prefix(&root_path)?.to_path_buf();
-    //         match fs::metadata(&path) {
-    //             Ok(metadata) => {
-    //                 let inode = metadata.ino();
-    //                 let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink();
-    //                 let name: Arc<OsStr> = Arc::from(path.file_name().unwrap_or(OsStr::new("/")));
-    //                 let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap();
-    //                 if metadata.is_dir() {
-    //                     ignore = ignore.add_child(&path).unwrap();
-    //                 }
-    //                 let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore();
-    //                 let parent = if path == root_path {
-    //                     None
-    //                 } else {
-    //                     Some(fs::metadata(path.parent().unwrap())?.ino())
-    //                 };
-
-    //                 let prev_entry = snapshot.entries.get(&inode);
-    //                 // If we haven't seen this inode yet, we are going to recursively scan it, so
-    //                 // ignore event involving a descendant.
-    //                 if prev_entry.is_none() {
-    //                     while paths.peek().map_or(false, |p| p.starts_with(&path)) {
-    //                         paths.next();
-    //                     }
-    //                 }
-
-    //                 observed.insert(inode);
-    //                 if metadata.file_type().is_dir() {
-    //                     let is_ignored = is_ignored || name.as_ref() == ".git";
-    //                     let dir_entry = Entry::Dir {
-    //                         parent,
-    //                         name,
-    //                         inode,
-    //                         is_symlink,
-    //                         is_ignored,
-    //                         children: Arc::from([]),
-    //                         pending: true,
-    //                     };
-    //                     self.insert_entries(Some(dir_entry.clone()));
-
-    //                     scan_queue_tx
-    //                         .send(Ok(ScanJob {
-    //                             ino: inode,
-    //                             path: Arc::from(path),
-    //                             relative_path,
-    //                             dir_entry,
-    //                             ignore: Some(ignore),
-    //                             scan_queue: scan_queue_tx.clone(),
-    //                         }))
-    //                         .unwrap();
-    //                 } else {
-    //                     self.insert_entries(Some(Entry::File {
-    //                         parent,
-    //                         name,
-    //                         path: PathEntry::new(inode, &relative_path, is_ignored),
-    //                         inode,
-    //                         is_symlink,
-    //                         is_ignored,
-    //                     }));
-    //                 }
-    //             }
-    //             Err(err) => {
-    //                 if err.kind() == io::ErrorKind::NotFound {
-    //                     // Fill removed with the inodes of all descendants of this path.
-    //                     let mut stack = Vec::new();
-    //                     stack.extend(snapshot.inode_for_path(&relative_path));
-    //                     while let Some(inode) = stack.pop() {
-    //                         removed.insert(inode);
-    //                         if let Some(Entry::Dir { children, .. }) = snapshot.entries.get(&inode)
-    //                         {
-    //                             stack.extend(children.iter().copied())
-    //                         }
-    //                     }
-    //                 } else {
-    //                     return Err(anyhow::Error::new(err));
-    //                 }
-    //             }
-    //         }
-    //     }
-    //     drop(scan_queue_tx);
-
-    //     let mut scanned_inodes = Vec::new();
-    //     scanned_inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new()));
-    //     self.thread_pool.scoped(|pool| {
-    //         for worker_inodes in &mut scanned_inodes {
-    //             pool.execute(|| {
-    //                 let worker_inodes = worker_inodes;
-    //                 while let Ok(job) = scan_queue_rx.recv() {
-    //                     if let Err(err) = job.and_then(|job| {
-    //                         self.scan_dir(job, Some(worker_inodes.as_mut().unwrap()))
-    //                     }) {
-    //                         *worker_inodes = Err(err);
-    //                         break;
-    //                     }
-    //                 }
-    //             });
-    //         }
-    //     });
-
-    //     for worker_inodes in scanned_inodes {
-    //         for inode in worker_inodes? {
-    //             remove_counts.remove(&inode);
-    //         }
-    //     }
-    //     self.remove_entries(remove_counts);
-
-    //     Ok(self.notify.receiver_count() != 0)
-    // }
+    fn fs_entry_for_path(&self, root_path: &Path, path: &Path) -> Result<Option<(Entry, Ignore)>> {
+        match fs::metadata(&path) {
+            Ok(metadata) => {
+                let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap();
+                if metadata.is_dir() {
+                    ignore = ignore.add_child(&path).unwrap();
+                }
+                let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore();
+
+                let inode = metadata.ino();
+                let name: Arc<OsStr> = Arc::from(path.file_name().unwrap_or(OsStr::new("/")));
+                let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink();
+                let parent = if path == root_path {
+                    None
+                } else {
+                    Some(fs::metadata(path.parent().unwrap())?.ino())
+                };
+                if metadata.file_type().is_dir() {
+                    Ok(Some((
+                        Entry::Dir {
+                            parent,
+                            name,
+                            inode,
+                            is_symlink,
+                            is_ignored,
+                            children: Arc::from([]),
+                            pending: true,
+                        },
+                        ignore,
+                    )))
+                } else {
+                    Ok(Some((
+                        Entry::File {
+                            parent,
+                            name,
+                            path: PathEntry::new(
+                                inode,
+                                &path.strip_prefix(root_path).unwrap(),
+                                is_ignored,
+                            ),
+                            inode,
+                            is_symlink,
+                            is_ignored,
+                        },
+                        ignore,
+                    )))
+                }
+            }
+            Err(err) => {
+                if err.kind() == io::ErrorKind::NotFound {
+                    Ok(None)
+                } else {
+                    Err(anyhow::Error::new(err))
+                }
+            }
+        }
+    }
 
     fn insert_entries(&self, entries: impl IntoIterator<Item = Entry>) {
         self.snapshot
@@ -816,67 +904,6 @@ impl BackgroundScanner {
             .entries
             .edit(entries.into_iter().map(Edit::Insert).collect::<Vec<_>>());
     }
-
-    // fn insert_entries(&self, entries: impl IntoIterator<Item = Entry>) {
-    //     let mut edits = Vec::new();
-    //     let mut new_parents = HashMap::new();
-    //     for entry in entries {
-    //         new_parents.insert(entry.ino(), entry.parent());
-    //         edits.push(Edit::Insert(entry));
-    //     }
-
-    //     let mut entries = self.snapshot.lock().entries;
-    //     let prev_entries = entries.edit(edits);
-    //     Self::remove_stale_children(&mut *entries, prev_entries, new_parents);
-    // }
-
-    // fn remove_entries(&self, inodes: impl IntoIterator<Item = u64>) {
-    //     let mut entries = self.entries.lock();
-    //     let prev_entries = entries.edit(inodes.into_iter().map(Edit::Remove).collect());
-    //     Self::remove_stale_children(&mut *entries, prev_entries, HashMap::new());
-    // }
-
-    // fn remove_stale_children(
-    //     tree: &mut SumTree<Entry>,
-    //     prev_entries: Vec<Entry>,
-    //     new_parents: HashMap<u64, Option<u64>>,
-    // ) {
-    //     let mut new_parent_entries = HashMap::new();
-
-    //     for prev_entry in prev_entries {
-    //         let new_parent = new_parents.get(&prev_entry.ino()).copied().flatten();
-    //         if new_parent != prev_entry.parent() {
-    //             if let Some(prev_parent) = prev_entry.parent() {
-    //                 let (_, new_children) =
-    //                     new_parent_entries.entry(prev_parent).or_insert_with(|| {
-    //                         let prev_parent_entry = tree.get(&prev_parent).unwrap();
-    //                         if let Entry::Dir { children, .. } = prev_parent_entry {
-    //                             (prev_parent_entry.clone(), children.to_vec())
-    //                         } else {
-    //                             unreachable!()
-    //                         }
-    //                     });
-
-    //                 if let Some(ix) = new_children.iter().position(|ino| *ino == prev_entry.ino()) {
-    //                     new_children.swap_remove(ix);
-    //                 }
-    //             }
-    //         }
-    //     }
-
-    //     let parent_edits = new_parent_entries
-    //         .into_iter()
-    //         .map(|(_, (mut parent_entry, new_children))| {
-    //             if let Entry::Dir { children, .. } = &mut parent_entry {
-    //                 *children = Arc::from(new_children);
-    //             } else {
-    //                 unreachable!()
-    //             }
-    //             Edit::Insert(parent_entry)
-    //         })
-    //         .collect::<Vec<_>>();
-    //     tree.edit(parent_edits);
-    // }
 }
 
 struct ScanJob {