:art: Don't store path changes statefully on the background scanner

Max Brunsfeld created

Change summary

crates/project/src/worktree.rs | 115 ++++++++++++++++-------------------
1 file changed, 54 insertions(+), 61 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -1,12 +1,12 @@
-use super::{ignore::IgnoreStack, DiagnosticSummary};
-use crate::{copy_recursive, ProjectEntryId, RemoveOptions};
+use crate::{
+    copy_recursive, ignore::IgnoreStack, DiagnosticSummary, ProjectEntryId, RemoveOptions,
+};
 use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Context, Result};
 use client::{proto, Client};
 use clock::ReplicaId;
 use collections::{HashMap, VecDeque};
-use fs::LineEnding;
-use fs::{repository::GitRepository, Fs};
+use fs::{repository::GitRepository, Fs, LineEnding};
 use futures::{
     channel::{
         mpsc::{self, UnboundedReceiver, UnboundedSender},
@@ -20,17 +20,16 @@ use gpui::{
     executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
     Task,
 };
-use language::File as _;
 use language::{
     proto::{
         deserialize_fingerprint, deserialize_version, serialize_fingerprint, serialize_line_ending,
         serialize_version,
     },
-    Buffer, DiagnosticEntry, PointUtf16, Rope, RopeFingerprint, Unclipped,
+    Buffer, DiagnosticEntry, File as _, PointUtf16, Rope, RopeFingerprint, Unclipped,
 };
 use parking_lot::Mutex;
-use postage::barrier;
 use postage::{
+    barrier,
     prelude::{Sink as _, Stream as _},
     watch,
 };
@@ -50,8 +49,7 @@ use std::{
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet};
-use util::paths::HOME;
-use util::{ResultExt, TryFutureExt};
+use util::{paths::HOME, ResultExt, TryFutureExt};
 
 #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
 pub struct WorktreeId(usize);
@@ -2141,7 +2139,6 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
 struct BackgroundScanner {
     fs: Arc<dyn Fs>,
     snapshot: Mutex<LocalSnapshot>,
-    changes: HashMap<Arc<Path>, PathChange>,
     notify: UnboundedSender<ScanState>,
     executor: Arc<executor::Background>,
 }
@@ -2158,7 +2155,6 @@ impl BackgroundScanner {
             snapshot: Mutex::new(snapshot),
             notify,
             executor,
-            changes: Default::default(),
         }
     }
 
@@ -2167,7 +2163,7 @@ impl BackgroundScanner {
     }
 
     async fn run(
-        mut self,
+        self,
         events_rx: impl Stream<Item = Vec<fsevent::Event>>,
         mut changed_paths: UnboundedReceiver<(Vec<PathBuf>, barrier::Sender)>,
     ) {
@@ -2312,32 +2308,31 @@ impl BackgroundScanner {
             while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
                 events.extend(additional_events);
             }
+            let abs_paths = events.into_iter().map(|e| e.path).collect();
             if self.notify.unbounded_send(ScanState::Updating).is_err() {
                 return;
             }
-            if !self
-                .process_events(events.into_iter().map(|e| e.path).collect(), true)
-                .await
-            {
-                return;
-            }
-            if self
-                .notify
-                .unbounded_send(ScanState::Updated {
-                    snapshot: self.snapshot.lock().clone(),
-                    changes: mem::take(&mut self.changes),
-                    barrier: None,
-                })
-                .is_err()
-            {
+            if let Some(changes) = self.process_events(abs_paths, true).await {
+                if self
+                    .notify
+                    .unbounded_send(ScanState::Updated {
+                        snapshot: self.snapshot.lock().clone(),
+                        changes,
+                        barrier: None,
+                    })
+                    .is_err()
+                {
+                    return;
+                }
+            } else {
                 return;
             }
         }
 
         // Continue processing events until the worktree is dropped.
         loop {
-            let abs_paths;
             let barrier;
+            let abs_paths;
             select_biased! {
                 request = changed_paths.next().fuse() => {
                     let Some((paths, b)) = request else { break };
@@ -2354,18 +2349,19 @@ impl BackgroundScanner {
             if self.notify.unbounded_send(ScanState::Updating).is_err() {
                 return;
             }
-            if !self.process_events(abs_paths, false).await {
-                return;
-            }
-            if self
-                .notify
-                .unbounded_send(ScanState::Updated {
-                    snapshot: self.snapshot.lock().clone(),
-                    changes: mem::take(&mut self.changes),
-                    barrier,
-                })
-                .is_err()
-            {
+            if let Some(changes) = self.process_events(abs_paths, false).await {
+                if self
+                    .notify
+                    .unbounded_send(ScanState::Updated {
+                        snapshot: self.snapshot.lock().clone(),
+                        changes,
+                        barrier,
+                    })
+                    .is_err()
+                {
+                    return;
+                }
+            } else {
                 return;
             }
         }
@@ -2505,10 +2501,10 @@ impl BackgroundScanner {
     }
 
     async fn process_events(
-        &mut self,
+        &self,
         abs_paths: Vec<PathBuf>,
         received_before_initialized: bool,
-    ) -> bool {
+    ) -> Option<HashMap<Arc<Path>, PathChange>> {
         let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
 
         let prev_snapshot = {
@@ -2517,14 +2513,9 @@ impl BackgroundScanner {
             snapshot.clone()
         };
 
-        let event_paths = if let Some(event_paths) = self
+        let event_paths = self
             .update_entries_for_paths(abs_paths, Some(scan_queue_tx))
-            .await
-        {
-            event_paths
-        } else {
-            return false;
-        };
+            .await?;
 
         // Scan any directories that were created as part of this event batch.
         self.executor
@@ -2553,13 +2544,13 @@ impl BackgroundScanner {
 
         self.update_ignore_statuses().await;
         self.update_git_repositories();
-        self.build_change_set(
+        let changes = self.build_change_set(
             prev_snapshot.snapshot,
             event_paths,
             received_before_initialized,
         );
         self.snapshot.lock().scan_completed();
-        true
+        Some(changes)
     }
 
     async fn update_entries_for_paths(
@@ -2763,17 +2754,18 @@ impl BackgroundScanner {
     }
 
     fn build_change_set(
-        &mut self,
+        &self,
         old_snapshot: Snapshot,
         event_paths: Vec<Arc<Path>>,
         received_before_initialized: bool,
-    ) {
+    ) -> HashMap<Arc<Path>, PathChange> {
+        use PathChange::{Added, AddedOrUpdated, Removed, Updated};
+
         let new_snapshot = self.snapshot.lock();
+        let mut changes = HashMap::default();
         let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>();
         let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>();
 
-        use PathChange::{Added, AddedOrUpdated, Removed, Updated};
-
         for path in event_paths {
             let path = PathKey(path);
             old_paths.seek(&path, Bias::Left, &());
@@ -2792,7 +2784,7 @@ impl BackgroundScanner {
 
                         match Ord::cmp(&old_entry.path, &new_entry.path) {
                             Ordering::Less => {
-                                self.changes.insert(old_entry.path.clone(), Removed);
+                                changes.insert(old_entry.path.clone(), Removed);
                                 old_paths.next(&());
                             }
                             Ordering::Equal => {
@@ -2800,31 +2792,32 @@ impl BackgroundScanner {
                                     // If the worktree was not fully initialized when this event was generated,
                                     // we can't know whether this entry was added during the scan or whether
                                     // it was merely updated.
-                                    self.changes.insert(old_entry.path.clone(), AddedOrUpdated);
+                                    changes.insert(old_entry.path.clone(), AddedOrUpdated);
                                 } else if old_entry.mtime != new_entry.mtime {
-                                    self.changes.insert(old_entry.path.clone(), Updated);
+                                    changes.insert(old_entry.path.clone(), Updated);
                                 }
                                 old_paths.next(&());
                                 new_paths.next(&());
                             }
                             Ordering::Greater => {
-                                self.changes.insert(new_entry.path.clone(), Added);
+                                changes.insert(new_entry.path.clone(), Added);
                                 new_paths.next(&());
                             }
                         }
                     }
                     (Some(old_entry), None) => {
-                        self.changes.insert(old_entry.path.clone(), Removed);
+                        changes.insert(old_entry.path.clone(), Removed);
                         old_paths.next(&());
                     }
                     (None, Some(new_entry)) => {
-                        self.changes.insert(new_entry.path.clone(), Added);
+                        changes.insert(new_entry.path.clone(), Added);
                         new_paths.next(&());
                     }
                     (None, None) => break,
                 }
             }
         }
+        changes
     }
 }