Always bump scan_id when refreshing an entry

Max Brunsfeld created

The scan_id needs to be bumped even if a scan is already in progress,
so that worktree updates can detect that entries have changed. This
means that the worktree's completed_scan_id may increase by more than
one at the end of a scan.

Change summary

crates/project/src/worktree.rs | 130 ++++++++++++++++++++---------------
1 file changed, 75 insertions(+), 55 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -95,7 +95,17 @@ pub struct Snapshot {
     root_char_bag: CharBag,
     entries_by_path: SumTree<Entry>,
     entries_by_id: SumTree<PathEntry>,
+
+    /// A number that increases every time the worktree begins scanning
+    /// a set of paths from the filesystem. This scanning could be caused
+    /// by some operation performed on the worktree, such as reading or
+    /// writing a file, or by an event reported by the filesystem.
     scan_id: usize,
+
+    /// The latest scan id that has completed, and whose preceding scans
+    /// have all completed. The current `scan_id` could be more than one
+    /// greater than the `completed_scan_id` if operations are performed
+    /// on the worktree while it is processing a file-system event.
     completed_scan_id: usize,
 }
 
@@ -2168,6 +2178,7 @@ impl BackgroundScanner {
         }
         {
             let mut snapshot = self.snapshot.lock();
+            snapshot.scan_id += 1;
             ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true);
             if ignore_stack.is_all() {
                 if let Some(mut root_entry) = snapshot.root_entry().cloned() {
@@ -2189,6 +2200,10 @@ impl BackgroundScanner {
         .unwrap();
         drop(scan_job_tx);
         self.scan_dirs(true, scan_job_rx).await;
+        {
+            let mut snapshot = self.snapshot.lock();
+            snapshot.completed_scan_id = snapshot.scan_id;
+        }
         self.send_status_update(false, None);
 
         // Process any any FS events that occurred while performing the initial scan.
@@ -2200,7 +2215,6 @@ impl BackgroundScanner {
                 paths.extend(more_events.into_iter().map(|e| e.path));
             }
             self.process_events(paths).await;
-            self.send_status_update(false, None);
         }
 
         self.finished_initial_scan = true;
@@ -2212,9 +2226,8 @@ impl BackgroundScanner {
                 // these before handling changes reported by the filesystem.
                 request = self.refresh_requests_rx.recv().fuse() => {
                     let Ok((paths, barrier)) = request else { break };
-                    self.reload_entries_for_paths(paths, None).await;
-                    if !self.send_status_update(false, Some(barrier)) {
-                        break;
+                    if !self.process_refresh_request(paths, barrier).await {
+                        return;
                     }
                 }
 
@@ -2225,15 +2238,17 @@ impl BackgroundScanner {
                         paths.extend(more_events.into_iter().map(|e| e.path));
                     }
                     self.process_events(paths).await;
-                    self.send_status_update(false, None);
                 }
             }
         }
     }
 
-    async fn process_events(&mut self, paths: Vec<PathBuf>) {
-        use futures::FutureExt as _;
+    async fn process_refresh_request(&self, paths: Vec<PathBuf>, barrier: barrier::Sender) -> bool {
+        self.reload_entries_for_paths(paths, None).await;
+        self.send_status_update(false, Some(barrier))
+    }
 
+    async fn process_events(&mut self, paths: Vec<PathBuf>) {
         let (scan_job_tx, scan_job_rx) = channel::unbounded();
         if let Some(mut paths) = self
             .reload_entries_for_paths(paths, Some(scan_job_tx.clone()))
@@ -2245,35 +2260,7 @@ impl BackgroundScanner {
         drop(scan_job_tx);
         self.scan_dirs(false, scan_job_rx).await;
 
-        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
-        let snapshot = self.update_ignore_statuses(ignore_queue_tx);
-        self.executor
-            .scoped(|scope| {
-                for _ in 0..self.executor.num_cpus() {
-                    scope.spawn(async {
-                        loop {
-                            select_biased! {
-                                // Process any path refresh requests before moving on to process
-                                // the queue of ignore statuses.
-                                request = self.refresh_requests_rx.recv().fuse() => {
-                                    let Ok((paths, barrier)) = request else { break };
-                                    self.reload_entries_for_paths(paths, None).await;
-                                    if !self.send_status_update(false, Some(barrier)) {
-                                        return;
-                                    }
-                                }
-
-                                // Recursively process directories whose ignores have changed.
-                                job = ignore_queue_rx.recv().fuse() => {
-                                    let Ok(job) = job else { break };
-                                    self.update_ignore_status(job, &snapshot).await;
-                                }
-                            }
-                        }
-                    });
-                }
-            })
-            .await;
+        self.update_ignore_statuses().await;
 
         let mut snapshot = self.snapshot.lock();
         let mut git_repositories = mem::take(&mut snapshot.git_repositories);
@@ -2281,6 +2268,9 @@ impl BackgroundScanner {
         snapshot.git_repositories = git_repositories;
         snapshot.removed_entry_ids.clear();
         snapshot.completed_scan_id = snapshot.scan_id;
+        drop(snapshot);
+
+        self.send_status_update(false, None);
     }
 
     async fn scan_dirs(
@@ -2313,8 +2303,7 @@ impl BackgroundScanner {
                                 // the scan queue, so that user operations are prioritized.
                                 request = self.refresh_requests_rx.recv().fuse() => {
                                     let Ok((paths, barrier)) = request else { break };
-                                    self.reload_entries_for_paths(paths, None).await;
-                                    if !self.send_status_update(false, Some(barrier)) {
+                                    if !self.process_refresh_request(paths, barrier).await {
                                         return;
                                     }
                                 }
@@ -2521,12 +2510,10 @@ impl BackgroundScanner {
         .await;
 
         let mut snapshot = self.snapshot.lock();
-
-        if snapshot.completed_scan_id == snapshot.scan_id {
-            snapshot.scan_id += 1;
-            if !doing_recursive_update {
-                snapshot.completed_scan_id = snapshot.scan_id;
-            }
+        let is_idle = snapshot.completed_scan_id == snapshot.scan_id;
+        snapshot.scan_id += 1;
+        if is_idle && !doing_recursive_update {
+            snapshot.completed_scan_id = snapshot.scan_id;
         }
 
         // Remove any entries for paths that no longer exist or are being recursively
@@ -2596,16 +2583,17 @@ impl BackgroundScanner {
         Some(event_paths)
     }
 
-    fn update_ignore_statuses(
-        &self,
-        ignore_queue_tx: Sender<UpdateIgnoreStatusJob>,
-    ) -> LocalSnapshot {
+    async fn update_ignore_statuses(&self) {
+        use futures::FutureExt as _;
+
         let mut snapshot = self.snapshot.lock().clone();
         let mut ignores_to_update = Vec::new();
         let mut ignores_to_delete = Vec::new();
         for (parent_abs_path, (_, scan_id)) in &snapshot.ignores_by_parent_abs_path {
             if let Ok(parent_path) = parent_abs_path.strip_prefix(&snapshot.abs_path) {
-                if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
+                if *scan_id > snapshot.completed_scan_id
+                    && snapshot.entry_for_path(parent_path).is_some()
+                {
                     ignores_to_update.push(parent_abs_path.clone());
                 }
 
@@ -2624,6 +2612,7 @@ impl BackgroundScanner {
                 .remove(&parent_abs_path);
         }
 
+        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
         ignores_to_update.sort_unstable();
         let mut ignores_to_update = ignores_to_update.into_iter().peekable();
         while let Some(parent_abs_path) = ignores_to_update.next() {
@@ -2642,8 +2631,34 @@ impl BackgroundScanner {
             }))
             .unwrap();
         }
+        drop(ignore_queue_tx);
+
+        self.executor
+            .scoped(|scope| {
+                for _ in 0..self.executor.num_cpus() {
+                    scope.spawn(async {
+                        loop {
+                            select_biased! {
+                                // Process any path refresh requests before moving on to process
+                                // the queue of ignore statuses.
+                                request = self.refresh_requests_rx.recv().fuse() => {
+                                    let Ok((paths, barrier)) = request else { break };
+                                    if !self.process_refresh_request(paths, barrier).await {
+                                        return;
+                                    }
+                                }
 
-        snapshot
+                                // Recursively process directories whose ignores have changed.
+                                job = ignore_queue_rx.recv().fuse() => {
+                                    let Ok(job) = job else { break };
+                                    self.update_ignore_status(job, &snapshot).await;
+                                }
+                            }
+                        }
+                    });
+                }
+            })
+            .await;
     }
 
     async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) {
@@ -3054,12 +3069,11 @@ mod tests {
     use fs::repository::FakeGitRepository;
     use fs::{FakeFs, RealFs};
     use gpui::{executor::Deterministic, TestAppContext};
+    use pretty_assertions::assert_eq;
     use rand::prelude::*;
     use serde_json::json;
     use std::{env, fmt::Write};
-    use util::http::FakeHttpClient;
-
-    use util::test::temp_tree;
+    use util::{http::FakeHttpClient, test::temp_tree};
 
     #[gpui::test]
     async fn test_traversal(cx: &mut TestAppContext) {
@@ -3461,7 +3475,7 @@ mod tests {
     }
 
     #[gpui::test(iterations = 30)]
-    async fn test_create_directory(cx: &mut TestAppContext) {
+    async fn test_create_directory_during_initial_scan(cx: &mut TestAppContext) {
         let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
 
         let fs = FakeFs::new(cx.background());
@@ -3486,6 +3500,8 @@ mod tests {
         .await
         .unwrap();
 
+        let mut snapshot1 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot());
+
         let entry = tree
             .update(cx, |tree, cx| {
                 tree.as_local_mut()
@@ -3497,10 +3513,14 @@ mod tests {
         assert!(entry.is_dir());
 
         cx.foreground().run_until_parked();
-
         tree.read_with(cx, |tree, _| {
             assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir);
         });
+
+        let snapshot2 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot());
+        let update = snapshot2.build_update(&snapshot1, 0, 0, true);
+        snapshot1.apply_remote_update(update).unwrap();
+        assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),);
     }
 
     #[gpui::test(iterations = 100)]