Start on refactoring `BackgroundScanner` to be generic over fs

Antonio Scandurra created

Change summary

zed/src/worktree.rs | 204 +++++++++++++++++++++++++++++++++-------------
1 file changed, 146 insertions(+), 58 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -22,6 +22,7 @@ use gpui::{
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use postage::{
+    broadcast,
     prelude::{Sink, Stream},
     watch,
 };
@@ -153,7 +154,7 @@ struct InMemoryEntry {
 struct InMemoryFsState {
     entries: BTreeMap<PathBuf, InMemoryEntry>,
     next_inode: u64,
-    events_tx: watch::Sender<()>,
+    events_tx: broadcast::Sender<fsevent::Event>,
 }
 
 impl InMemoryFsState {
@@ -168,16 +169,26 @@ impl InMemoryFsState {
             Err(anyhow!("invalid "))
         }
     }
+
+    async fn emit_event(&mut self, path: &Path) {
+        let _ = self
+            .events_tx
+            .send(fsevent::Event {
+                event_id: 0,
+                flags: fsevent::StreamFlags::empty(),
+                path: path.to_path_buf(),
+            })
+            .await;
+    }
 }
 
 pub struct InMemoryFs {
     state: RwLock<InMemoryFsState>,
-    events_rx: watch::Receiver<()>,
 }
 
 impl InMemoryFs {
     pub fn new() -> Self {
-        let (events_tx, events_rx) = watch::channel();
+        let (events_tx, _) = broadcast::channel(2048);
         let mut entries = BTreeMap::new();
         entries.insert(
             Path::new("/").to_path_buf(),
@@ -195,7 +206,6 @@ impl InMemoryFs {
                 next_inode: 1,
                 events_tx,
             }),
-            events_rx,
         }
     }
 
@@ -215,8 +225,33 @@ impl InMemoryFs {
                 content: None,
             },
         );
+        state.emit_event(path).await;
         Ok(())
     }
+
+    pub async fn remove(&self, path: &Path) -> Result<()> {
+        let mut state = self.state.write().await;
+        state.validate_path(path)?;
+
+        let mut paths = Vec::new();
+        state.entries.retain(|path, _| {
+            if path.starts_with(path) {
+                paths.push(path.to_path_buf());
+                false
+            } else {
+                true
+            }
+        });
+        for path in paths {
+            state.emit_event(&path).await;
+        }
+
+        Ok(())
+    }
+
+    pub async fn events(&self) -> broadcast::Receiver<fsevent::Event> {
+        self.state.read().await.events_tx.subscribe()
+    }
 }
 
 #[async_trait::async_trait]
@@ -267,6 +302,7 @@ impl Fs for InMemoryFs {
             } else {
                 entry.content = Some(text.chunks().collect());
                 entry.mtime = SystemTime::now();
+                state.emit_event(path).await;
                 Ok(())
             }
         } else {
@@ -282,6 +318,7 @@ impl Fs for InMemoryFs {
                     content: Some(text.chunks().collect()),
                 },
             );
+            state.emit_event(path).await;
             Ok(())
         }
     }
@@ -291,7 +328,7 @@ impl Fs for InMemoryFs {
 enum ScanState {
     Idle,
     Scanning,
-    Err(Arc<io::Error>),
+    Err(Arc<anyhow::Error>),
 }
 
 pub enum Worktree {
@@ -331,7 +368,7 @@ impl Worktree {
         cx: &mut ModelContext<Worktree>,
     ) -> Self {
         let fs = Arc::new(OsFs);
-        let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs, cx);
+        let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx);
         let (event_stream, event_stream_handle) = fsevent::EventStream::new(
             &[tree.snapshot.abs_path.as_ref()],
             Duration::from_millis(100),
@@ -339,7 +376,7 @@ impl Worktree {
         let background_snapshot = tree.background_snapshot.clone();
         let id = tree.id;
         std::thread::spawn(move || {
-            let scanner = BackgroundScanner::new(background_snapshot, scan_states_tx, id);
+            let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id);
             scanner.run(event_stream);
         });
         tree._event_stream_handle = Some(event_stream_handle);
@@ -356,7 +393,14 @@ impl Worktree {
         let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx);
         let background_snapshot = tree.background_snapshot.clone();
         let id = tree.id;
-        cx.background().spawn(async move {}).detach();
+        let fs = fs.clone();
+        cx.background()
+            .spawn(async move {
+                let events_rx = fs.events().await;
+                let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id);
+                scanner.run_test(events_rx).await;
+            })
+            .detach();
         Worktree::Local(tree)
     }
 
@@ -1933,14 +1977,21 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
 }
 
 struct BackgroundScanner {
+    fs: Arc<dyn Fs>,
     snapshot: Arc<Mutex<Snapshot>>,
     notify: Sender<ScanState>,
     thread_pool: scoped_pool::Pool,
 }
 
 impl BackgroundScanner {
-    fn new(snapshot: Arc<Mutex<Snapshot>>, notify: Sender<ScanState>, worktree_id: usize) -> Self {
+    fn new(
+        fs: Arc<dyn Fs>,
+        snapshot: Arc<Mutex<Snapshot>>,
+        notify: Sender<ScanState>,
+        worktree_id: usize,
+    ) -> Self {
         Self {
+            fs,
             snapshot,
             notify,
             thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)),
@@ -1960,7 +2011,7 @@ impl BackgroundScanner {
             return;
         }
 
-        if let Err(err) = self.scan_dirs() {
+        if let Err(err) = smol::block_on(self.scan_dirs()) {
             if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
                 return;
             }
@@ -1975,7 +2026,7 @@ impl BackgroundScanner {
                 return false;
             }
 
-            if !self.process_events(events) {
+            if !smol::block_on(self.process_events(events)) {
                 return false;
             }
 
@@ -1987,46 +2038,82 @@ impl BackgroundScanner {
         });
     }
 
-    fn scan_dirs(&mut self) -> io::Result<()> {
-        self.snapshot.lock().scan_id += 1;
+    #[cfg(any(test, feature = "test-support"))]
+    async fn run_test(mut self, mut events_rx: broadcast::Receiver<fsevent::Event>) {
+        if self.notify.send(ScanState::Scanning).await.is_err() {
+            return;
+        }
+
+        if let Err(err) = self.scan_dirs().await {
+            if self
+                .notify
+                .send(ScanState::Err(Arc::new(err)))
+                .await
+                .is_err()
+            {
+                return;
+            }
+        }
+
+        if self.notify.send(ScanState::Idle).await.is_err() {
+            return;
+        }
+
+        while let Some(event) = events_rx.recv().await {
+            let mut events = vec![event];
+            while let Ok(event) = events_rx.try_recv() {
+                events.push(event);
+            }
+
+            if self.notify.send(ScanState::Scanning).await.is_err() {
+                break;
+            }
+
+            if self.process_events(events).await {
+                break;
+            }
+
+            if self.notify.send(ScanState::Idle).await.is_err() {
+                break;
+            }
+        }
+    }
+
+    async fn scan_dirs(&mut self) -> Result<()> {
+        let next_entry_id;
+        {
+            let mut snapshot = self.snapshot.lock();
+            snapshot.scan_id += 1;
+            next_entry_id = snapshot.next_entry_id.clone();
+        }
 
         let path: Arc<Path> = Arc::from(Path::new(""));
         let abs_path = self.abs_path();
-        let metadata = fs::metadata(&abs_path)?;
-        let inode = metadata.ino();
-        let is_symlink = fs::symlink_metadata(&abs_path)?.file_type().is_symlink();
-        let is_dir = metadata.file_type().is_dir();
-        let mtime = metadata.modified()?;
 
         // After determining whether the root entry is a file or a directory, populate the
         // snapshot's "root name", which will be used for the purpose of fuzzy matching.
         let mut root_name = abs_path
             .file_name()
             .map_or(String::new(), |f| f.to_string_lossy().to_string());
+        let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
+        let entry = self
+            .fs
+            .entry(root_char_bag, &next_entry_id, path.clone(), &abs_path)
+            .await?
+            .ok_or_else(|| anyhow!("root entry does not exist"))?;
+        let is_dir = entry.is_dir();
         if is_dir {
             root_name.push('/');
         }
 
-        let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
-        let next_entry_id;
         {
             let mut snapshot = self.snapshot.lock();
             snapshot.root_name = root_name;
             snapshot.root_char_bag = root_char_bag;
-            next_entry_id = snapshot.next_entry_id.clone();
         }
 
+        self.snapshot.lock().insert_entry(entry);
         if is_dir {
-            self.snapshot.lock().insert_entry(Entry {
-                id: next_entry_id.fetch_add(1, SeqCst),
-                kind: EntryKind::PendingDir,
-                path: path.clone(),
-                inode,
-                mtime,
-                is_symlink,
-                is_ignored: false,
-            });
-
             let (tx, rx) = crossbeam_channel::unbounded();
             tx.send(ScanJob {
                 abs_path: abs_path.to_path_buf(),
@@ -2041,31 +2128,23 @@ impl BackgroundScanner {
                 for _ in 0..self.thread_pool.thread_count() {
                     pool.execute(|| {
                         while let Ok(job) = rx.recv() {
-                            if let Err(err) =
-                                self.scan_dir(root_char_bag, next_entry_id.clone(), &job)
-                            {
+                            if let Err(err) = smol::block_on(self.scan_dir(
+                                root_char_bag,
+                                next_entry_id.clone(),
+                                &job,
+                            )) {
                                 log::error!("error scanning {:?}: {}", job.abs_path, err);
                             }
                         }
                     });
                 }
             });
-        } else {
-            self.snapshot.lock().insert_entry(Entry {
-                id: next_entry_id.fetch_add(1, SeqCst),
-                kind: EntryKind::File(char_bag_for_path(root_char_bag, &path)),
-                path,
-                inode,
-                mtime,
-                is_symlink,
-                is_ignored: false,
-            });
         }
 
         Ok(())
     }
 
-    fn scan_dir(
+    async fn scan_dir(
         &self,
         root_char_bag: CharBag,
         next_entry_id: Arc<AtomicUsize>,
@@ -2164,7 +2243,7 @@ impl BackgroundScanner {
         Ok(())
     }
 
-    fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
+    async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
         let mut snapshot = self.snapshot();
         snapshot.scan_id += 1;
 
@@ -2207,12 +2286,16 @@ impl BackgroundScanner {
                 }
             };
 
-            match smol::block_on(OsFs.entry(
-                snapshot.root_char_bag,
-                &next_entry_id,
-                path.clone(),
-                &event.path,
-            )) {
+            match self
+                .fs
+                .entry(
+                    snapshot.root_char_bag,
+                    &next_entry_id,
+                    path.clone(),
+                    &event.path,
+                )
+                .await
+            {
                 Ok(Some(mut fs_entry)) => {
                     let is_dir = fs_entry.is_dir();
                     let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir);
@@ -2245,8 +2328,11 @@ impl BackgroundScanner {
             for _ in 0..self.thread_pool.thread_count() {
                 pool.execute(|| {
                     while let Ok(job) = scan_queue_rx.recv() {
-                        if let Err(err) = self.scan_dir(root_char_bag, next_entry_id.clone(), &job)
-                        {
+                        if let Err(err) = smol::block_on(self.scan_dir(
+                            root_char_bag,
+                            next_entry_id.clone(),
+                            &job,
+                        )) {
                             log::error!("error scanning {:?}: {}", job.abs_path, err);
                         }
                     }
@@ -3061,6 +3147,7 @@ mod tests {
 
             let (notify_tx, _notify_rx) = smol::channel::unbounded();
             let mut scanner = BackgroundScanner::new(
+                Arc::new(OsFs),
                 Arc::new(Mutex::new(Snapshot {
                     id: 0,
                     scan_id: 0,
@@ -3076,7 +3163,7 @@ mod tests {
                 notify_tx,
                 0,
             );
-            scanner.scan_dirs().unwrap();
+            smol::block_on(scanner.scan_dirs()).unwrap();
             scanner.snapshot().check_invariants();
 
             let mut events = Vec::new();
@@ -3086,7 +3173,7 @@ mod tests {
                     let len = rng.gen_range(0..=events.len());
                     let to_deliver = events.drain(0..len).collect::<Vec<_>>();
                     log::info!("Delivering events: {:#?}", to_deliver);
-                    scanner.process_events(to_deliver);
+                    smol::block_on(scanner.process_events(to_deliver));
                     scanner.snapshot().check_invariants();
                 } else {
                     events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
@@ -3094,11 +3181,12 @@ mod tests {
                 }
             }
             log::info!("Quiescing: {:#?}", events);
-            scanner.process_events(events);
+            smol::block_on(scanner.process_events(events));
             scanner.snapshot().check_invariants();
 
             let (notify_tx, _notify_rx) = smol::channel::unbounded();
             let mut new_scanner = BackgroundScanner::new(
+                scanner.fs.clone(),
                 Arc::new(Mutex::new(Snapshot {
                     id: 0,
                     scan_id: 0,
@@ -3114,7 +3202,7 @@ mod tests {
                 notify_tx,
                 1,
             );
-            new_scanner.scan_dirs().unwrap();
+            smol::block_on(new_scanner.scan_dirs()).unwrap();
             assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
         }
     }