Simplify and improve concurrency of git status updates (#12513)

Max Brunsfeld created

The quest for responsiveness in large git repos continues. This is a
follow-up to https://github.com/zed-industries/zed/pull/12444

Release Notes:

- N/A

Change summary

crates/worktree/src/worktree.rs | 177 +++++++++++++++-------------------
1 file changed, 76 insertions(+), 101 deletions(-)

Detailed changes

crates/worktree/src/worktree.rs 🔗

@@ -78,8 +78,6 @@ pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100);
 #[cfg(not(feature = "test-support"))]
 pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100);
 
-const GIT_STATUS_UPDATE_BATCH_SIZE: usize = 1024;
-
 #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
 pub struct WorktreeId(usize);
 
@@ -4293,7 +4291,7 @@ impl BackgroundScanner {
     async fn update_git_repositories(&self, dot_git_paths: Vec<PathBuf>) {
         log::debug!("reloading repositories: {dot_git_paths:?}");
 
-        let (update_job_tx, update_job_rx) = channel::unbounded();
+        let mut repo_updates = Vec::new();
         {
             let mut state = self.state.lock();
             let scan_id = state.snapshot.scan_id;
@@ -4308,7 +4306,7 @@ impl BackgroundScanner {
                                 .then(|| (*entry_id, repo.clone()))
                         });
 
-                let (work_dir, repository) = match existing_repository_entry {
+                let (work_directory, repository) = match existing_repository_entry {
                     None => {
                         match state.build_git_repository(dot_git_dir.into(), self.fs.as_ref()) {
                             Some(output) => output,
@@ -4327,7 +4325,6 @@ impl BackgroundScanner {
                             continue;
                         };
 
-                        log::info!("reload git repository {dot_git_dir:?}");
                         let repo = &repository.repo_ptr;
                         let branch = repo.branch_name();
                         repo.reload_index();
@@ -4345,41 +4342,16 @@ impl BackgroundScanner {
                     }
                 };
 
-                let statuses = repository
-                    .statuses(Path::new(""))
-                    .log_err()
-                    .unwrap_or_default();
-                let entries = state.snapshot.entries_by_path.clone();
-                let location_in_repo = state
-                    .snapshot
-                    .repository_entries
-                    .get(&work_dir)
-                    .and_then(|repo| repo.location_in_repo.clone());
-                let mut files =
-                    state
+                repo_updates.push(UpdateGitStatusesJob {
+                    location_in_repo: state
                         .snapshot
-                        .traverse_from_path(true, false, false, work_dir.0.as_ref());
-                let mut start_path = work_dir.0.clone();
-                while start_path.starts_with(&work_dir.0) {
-                    files.advance_by(GIT_STATUS_UPDATE_BATCH_SIZE);
-                    let end_path = files.entry().map(|e| e.path.clone());
-                    smol::block_on(update_job_tx.send(UpdateGitStatusesJob {
-                        start_path: start_path.clone(),
-                        end_path: end_path.clone(),
-                        entries: entries.clone(),
-                        location_in_repo: location_in_repo.clone(),
-                        containing_repository: ScanJobContainingRepository {
-                            work_directory: work_dir.clone(),
-                            statuses: statuses.clone(),
-                        },
-                    }))
-                    .unwrap();
-                    if let Some(end_path) = end_path {
-                        start_path = end_path;
-                    } else {
-                        break;
-                    }
-                }
+                        .repository_entries
+                        .get(&work_directory)
+                        .and_then(|repo| repo.location_in_repo.clone())
+                        .clone(),
+                    work_directory,
+                    repository,
+                });
             }
 
             // Remove any git repositories whose .git entry no longer exists.
@@ -4414,87 +4386,92 @@ impl BackgroundScanner {
                 .repository_entries
                 .retain(|_, entry| ids_to_preserve.contains(&entry.work_directory.0));
         }
-        drop(update_job_tx);
 
+        let (mut updates_done_tx, mut updates_done_rx) = barrier::channel();
         self.executor
             .scoped(|scope| {
-                for _ in 0..self.executor.num_cpus() {
-                    scope.spawn(async {
-                        loop {
-                            select_biased! {
-                                // Process any path refresh requests before moving on to process
-                                // the queue of git statuses.
-                                request = self.scan_requests_rx.recv().fuse() => {
-                                    let Ok(request) = request else { break };
-                                    if !self.process_scan_request(request, true).await {
-                                        return;
-                                    }
-                                }
+                scope.spawn(async {
+                    for repo_update in repo_updates {
+                        self.update_git_statuses(repo_update);
+                    }
+                    updates_done_tx.blocking_send(()).ok();
+                });
 
-                                // Process git status updates in batches.
-                                job = update_job_rx.recv().fuse() => {
-                                    let Ok(job) = job else { break };
-                                    self.update_git_statuses(job);
+                scope.spawn(async {
+                    loop {
+                        select_biased! {
+                            // Process any path refresh requests before moving on to process
+                            // the queue of git statuses.
+                            request = self.scan_requests_rx.recv().fuse() => {
+                                let Ok(request) = request else { break };
+                                if !self.process_scan_request(request, true).await {
+                                    return;
                                 }
                             }
+                            _ = updates_done_rx.recv().fuse() =>  break,
                         }
-                    });
-                }
+                    }
+                });
             })
             .await;
     }
 
     /// Update the git statuses for a given batch of entries.
     fn update_git_statuses(&self, job: UpdateGitStatusesJob) {
-        // Determine which entries in this batch have changed their git status.
+        log::trace!("updating git statuses for repo {:?}", job.work_directory.0);
         let t0 = Instant::now();
-        let mut edits = Vec::new();
-        for entry in Traversal::new(&job.entries, true, false, false, &job.start_path) {
-            if job
-                .end_path
-                .as_ref()
-                .map_or(false, |end| &entry.path >= end)
-            {
+        let Some(statuses) = job.repository.statuses(Path::new("")).log_err() else {
+            return;
+        };
+        log::trace!(
+            "computed git statuses for repo {:?} in {:?}",
+            job.work_directory.0,
+            t0.elapsed()
+        );
+
+        let t0 = Instant::now();
+        let mut changes = Vec::new();
+        let snapshot = self.state.lock().snapshot.snapshot.clone();
+        for file in snapshot.traverse_from_path(true, false, false, job.work_directory.0.as_ref()) {
+            let Ok(repo_path) = file.path.strip_prefix(&job.work_directory.0) else {
                 break;
-            }
-            let Ok(repo_path) = entry
-                .path
-                .strip_prefix(&job.containing_repository.work_directory)
-            else {
-                continue;
             };
-            let repo_path = RepoPath(if let Some(location) = &job.location_in_repo {
-                location.join(repo_path)
+            let git_status = if let Some(location) = &job.location_in_repo {
+                statuses.get(&location.join(repo_path))
             } else {
-                repo_path.to_path_buf()
-            });
-            let git_status = job.containing_repository.statuses.get(&repo_path);
-            if entry.git_status != git_status {
-                let mut entry = entry.clone();
+                statuses.get(&repo_path)
+            };
+            if file.git_status != git_status {
+                let mut entry = file.clone();
                 entry.git_status = git_status;
-                edits.push(Edit::Insert(entry));
+                changes.push((entry.path, git_status));
             }
         }
 
-        // Apply the git status changes.
-        if edits.len() > 0 {
-            let mut state = self.state.lock();
-            let path_changes = edits.iter().map(|edit| {
-                if let Edit::Insert(entry) = edit {
-                    entry.path.clone()
-                } else {
-                    unreachable!()
-                }
-            });
-            util::extend_sorted(&mut state.changed_paths, path_changes, usize::MAX, Ord::cmp);
-            state.snapshot.entries_by_path.edit(edits, &());
-        }
+        let mut state = self.state.lock();
+        let edits = changes
+            .iter()
+            .filter_map(|(path, git_status)| {
+                let entry = state.snapshot.entry_for_path(path)?.clone();
+                Some(Edit::Insert(Entry {
+                    git_status: *git_status,
+                    ..entry.clone()
+                }))
+            })
+            .collect();
 
+        // Apply the git status changes.
+        util::extend_sorted(
+            &mut state.changed_paths,
+            changes.iter().map(|p| p.0.clone()),
+            usize::MAX,
+            Ord::cmp,
+        );
+        state.snapshot.entries_by_path.edit(edits, &());
         log::trace!(
-            "refreshed git status of entries starting with {} in {:?}",
-            // entries.len(),
-            job.start_path.display(),
-            t0.elapsed()
+            "applied git status updates for repo {:?} in {:?}",
+            job.work_directory.0,
+            t0.elapsed(),
         );
     }
 
@@ -4664,11 +4641,9 @@ struct UpdateIgnoreStatusJob {
 }
 
 struct UpdateGitStatusesJob {
-    entries: SumTree<Entry>,
-    start_path: Arc<Path>,
-    end_path: Option<Arc<Path>>,
-    containing_repository: ScanJobContainingRepository,
+    work_directory: RepositoryWorkDirectory,
     location_in_repo: Option<Arc<Path>>,
+    repository: Arc<dyn GitRepository>,
 }
 
 pub trait WorktreeModelHandle {