Extract Fs::child_entries method to avoid sync I/O in BackgroundScanner

Antonio Scandurra created

Change summary

zed/src/worktree.rs | 148 ++++++++++++++++++++++++++++++----------------
1 file changed, 96 insertions(+), 52 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -14,6 +14,7 @@ use crate::{
 use ::ignore::gitignore::Gitignore;
 use anyhow::{anyhow, Context, Result};
 use atomic::Ordering::SeqCst;
+use futures::{Stream, StreamExt};
 pub use fuzzy::{match_paths, PathMatch};
 use gpui::{
     executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
@@ -23,7 +24,7 @@ use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use postage::{
     broadcast,
-    prelude::{Sink, Stream},
+    prelude::{Sink as _, Stream as _},
     watch,
 };
 use smol::{
@@ -36,12 +37,13 @@ use std::{
     collections::{BTreeMap, HashMap},
     convert::{TryFrom, TryInto},
     ffi::{OsStr, OsString},
-    fmt, fs,
+    fmt,
     future::Future,
     io,
     ops::Deref,
     os::unix::fs::MetadataExt,
     path::{Path, PathBuf},
+    pin::Pin,
     sync::{
         atomic::{self, AtomicUsize},
         Arc,
@@ -74,6 +76,13 @@ trait Fs: Send + Sync {
         path: Arc<Path>,
         abs_path: &Path,
     ) -> Result<Option<Entry>>;
+    async fn child_entries<'a>(
+        &self,
+        root_char_bag: CharBag,
+        next_entry_id: &'a AtomicUsize,
+        path: &'a Path,
+        abs_path: &'a Path,
+    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
     async fn load(&self, path: &Path) -> Result<String>;
     async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 }
@@ -124,6 +133,41 @@ impl Fs for OsFs {
         Ok(Some(entry))
     }
 
+    async fn child_entries<'a>(
+        &self,
+        root_char_bag: CharBag,
+        next_entry_id: &'a AtomicUsize,
+        path: &'a Path,
+        abs_path: &'a Path,
+    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
+        let entries = smol::fs::read_dir(abs_path).await?;
+        Ok(entries
+            .then(move |entry| async move {
+                let child_entry = entry?;
+                let child_name = child_entry.file_name();
+                let child_path: Arc<Path> = path.join(&child_name).into();
+                let child_abs_path = abs_path.join(&child_name);
+                let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
+                let child_metadata = smol::fs::metadata(child_abs_path).await?;
+                let child_inode = child_metadata.ino();
+                let child_mtime = child_metadata.modified()?;
+                Ok(Entry {
+                    id: next_entry_id.fetch_add(1, SeqCst),
+                    kind: if child_metadata.file_type().is_dir() {
+                        EntryKind::PendingDir
+                    } else {
+                        EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
+                    },
+                    path: child_path,
+                    inode: child_inode,
+                    mtime: child_mtime,
+                    is_symlink: child_is_symlink,
+                    is_ignored: false,
+                })
+            })
+            .boxed())
+    }
+
     async fn load(&self, path: &Path) -> Result<String> {
         let mut file = smol::fs::File::open(path).await?;
         let mut text = String::new();
@@ -283,6 +327,16 @@ impl Fs for InMemoryFs {
         }
     }
 
+    async fn child_entries<'a>(
+        &self,
+        root_char_bag: CharBag,
+        next_entry_id: &'a AtomicUsize,
+        path: &'a Path,
+        abs_path: &'a Path,
+    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
+        todo!()
+    }
+
     async fn load(&self, path: &Path) -> Result<String> {
         let state = self.state.read().await;
         let text = state
@@ -2156,33 +2210,38 @@ impl BackgroundScanner {
         root_char_bag: CharBag,
         next_entry_id: Arc<AtomicUsize>,
         job: &ScanJob,
-    ) -> io::Result<()> {
+    ) -> Result<()> {
         let mut new_entries: Vec<Entry> = Vec::new();
         let mut new_jobs: Vec<ScanJob> = Vec::new();
         let mut ignore_stack = job.ignore_stack.clone();
         let mut new_ignore = None;
 
-        for child_entry in fs::read_dir(&job.abs_path)? {
-            let child_entry = child_entry?;
-            let child_name = child_entry.file_name();
-            let child_abs_path = job.abs_path.join(&child_name);
-            let child_path: Arc<Path> = job.path.join(&child_name).into();
-            let child_is_symlink = child_entry.metadata()?.file_type().is_symlink();
-            let child_metadata = if let Ok(metadata) = fs::metadata(&child_abs_path) {
-                metadata
-            } else {
-                log::error!("could not get metadata for path {:?}", child_abs_path);
-                continue;
+        let mut child_entries = self
+            .fs
+            .child_entries(
+                root_char_bag,
+                next_entry_id.as_ref(),
+                &job.path,
+                &job.abs_path,
+            )
+            .await?;
+        while let Some(child_entry) = child_entries.next().await {
+            let mut child_entry = match child_entry {
+                Ok(child_entry) => child_entry,
+                Err(error) => {
+                    log::error!("error processing entry {:?}", error);
+                    continue;
+                }
             };
-
-            let child_inode = child_metadata.ino();
-            let child_mtime = child_metadata.modified()?;
+            let child_name = child_entry.path.file_name().unwrap();
+            let child_abs_path = job.abs_path.join(&child_name);
+            let child_path = child_entry.path.clone();
 
             // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
             if child_name == *GITIGNORE {
                 let (ignore, err) = Gitignore::new(&child_abs_path);
                 if let Some(err) = err {
-                    log::error!("error in ignore file {:?} - {:?}", child_path, err);
+                    log::error!("error in ignore file {:?} - {:?}", child_entry.path, err);
                 }
                 let ignore = Arc::new(ignore);
                 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
@@ -2205,17 +2264,10 @@ impl BackgroundScanner {
                 }
             }
 
-            if child_metadata.is_dir() {
+            if child_entry.is_dir() {
                 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
-                new_entries.push(Entry {
-                    id: next_entry_id.fetch_add(1, SeqCst),
-                    kind: EntryKind::PendingDir,
-                    path: child_path.clone(),
-                    inode: child_inode,
-                    mtime: child_mtime,
-                    is_symlink: child_is_symlink,
-                    is_ignored,
-                });
+                child_entry.is_ignored = is_ignored;
+                new_entries.push(child_entry);
                 new_jobs.push(ScanJob {
                     abs_path: child_abs_path,
                     path: child_path,
@@ -2227,16 +2279,8 @@ impl BackgroundScanner {
                     scan_queue: job.scan_queue.clone(),
                 });
             } else {
-                let is_ignored = ignore_stack.is_path_ignored(&child_path, false);
-                new_entries.push(Entry {
-                    id: next_entry_id.fetch_add(1, SeqCst),
-                    kind: EntryKind::File(char_bag_for_path(root_char_bag, &child_path)),
-                    path: child_path,
-                    inode: child_inode,
-                    mtime: child_mtime,
-                    is_symlink: child_is_symlink,
-                    is_ignored,
-                });
+                child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
+                new_entries.push(child_entry);
             };
         }
 
@@ -2516,11 +2560,11 @@ impl WorktreeHandle for ModelHandle<Worktree> {
         let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
         let tree = self.clone();
         async move {
-            fs::write(root_path.join(filename), "").unwrap();
+            std::fs::write(root_path.join(filename), "").unwrap();
             tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
                 .await;
 
-            fs::remove_file(root_path.join(filename)).unwrap();
+            std::fs::remove_file(root_path.join(filename)).unwrap();
             tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
                 .await;
 
@@ -2911,7 +2955,7 @@ mod tests {
         });
         save.await.unwrap();
 
-        let new_text = fs::read_to_string(dir.path().join("file1")).unwrap();
+        let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
         assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
     }
 
@@ -2938,7 +2982,7 @@ mod tests {
         });
         save.await.unwrap();
 
-        let new_text = fs::read_to_string(file_path).unwrap();
+        let new_text = std::fs::read_to_string(file_path).unwrap();
         assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
     }
 
@@ -3117,8 +3161,8 @@ mod tests {
             assert_eq!(ignored.is_ignored(), true);
         });
 
-        fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
-        fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
+        std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
+        std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
         tree.flush_fs_events(&cx).await;
         cx.read(|cx| {
             let tree = tree.read(cx);
@@ -3246,10 +3290,10 @@ mod tests {
 
             if rng.gen() {
                 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
-                fs::create_dir(&new_path)?;
+                std::fs::create_dir(&new_path)?;
             } else {
                 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
-                fs::write(&new_path, "")?;
+                std::fs::write(&new_path, "")?;
             }
             record_event(new_path);
         } else if rng.gen_bool(0.05) {
@@ -3283,7 +3327,7 @@ mod tests {
                 ignore_path.strip_prefix(&root_path)?,
                 ignore_contents
             );
-            fs::write(&ignore_path, ignore_contents).unwrap();
+            std::fs::write(&ignore_path, ignore_contents).unwrap();
             record_event(ignore_path);
         } else {
             let old_path = {
@@ -3303,7 +3347,7 @@ mod tests {
                 let overwrite_existing_dir =
                     !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
                 let new_path = if overwrite_existing_dir {
-                    fs::remove_dir_all(&new_path_parent).ok();
+                    std::fs::remove_dir_all(&new_path_parent).ok();
                     new_path_parent.to_path_buf()
                 } else {
                     new_path_parent.join(gen_name(rng))
@@ -3319,14 +3363,14 @@ mod tests {
                     },
                     new_path.strip_prefix(&root_path)?
                 );
-                fs::rename(&old_path, &new_path)?;
+                std::fs::rename(&old_path, &new_path)?;
                 record_event(old_path.clone());
                 record_event(new_path);
             } else if old_path.is_dir() {
                 let (dirs, files) = read_dir_recursive(old_path.clone());
 
                 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
-                fs::remove_dir_all(&old_path).unwrap();
+                std::fs::remove_dir_all(&old_path).unwrap();
                 for file in files {
                     record_event(file);
                 }
@@ -3335,7 +3379,7 @@ mod tests {
                 }
             } else {
                 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
-                fs::remove_file(old_path).unwrap();
+                std::fs::remove_file(old_path).unwrap();
                 record_event(old_path.clone());
             }
         }
@@ -3344,7 +3388,7 @@ mod tests {
     }
 
     fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
-        let child_entries = fs::read_dir(&path).unwrap();
+        let child_entries = std::fs::read_dir(&path).unwrap();
         let mut dirs = vec![path];
         let mut files = Vec::new();
         for child_entry in child_entries {