Move git status updates to a background thread (#24307)

Kirill Bulatov created

Part of https://github.com/zed-industries/zed/issues/24099
Part of https://github.com/zed-industries/zed/issues/23025

Git status checks & updates are still slow for such repos, but those are
now not blocking FS entry population and rescans.

Release Notes:

- Improved project panel's speed in large projects

Change summary

crates/worktree/src/worktree.rs       | 513 ++++++++++++++--------------
crates/worktree/src/worktree_tests.rs |   2 
2 files changed, 258 insertions(+), 257 deletions(-)

Detailed changes

crates/worktree/src/worktree.rs 🔗

@@ -13,6 +13,7 @@ use futures::{
         mpsc::{self, UnboundedSender},
         oneshot,
     },
+    future::join_all,
     select_biased,
     task::Poll,
     FutureExt as _, Stream, StreamExt,
@@ -450,6 +451,7 @@ struct BackgroundScannerState {
     changed_paths: Vec<Arc<Path>>,
     prev_snapshot: Snapshot,
     git_hosting_provider_registry: Option<Arc<GitHostingProviderRegistry>>,
+    repository_scans: HashMap<Arc<Path>, Task<()>>,
 }
 
 #[derive(Debug, Clone)]
@@ -1336,7 +1338,7 @@ impl LocalWorktree {
                     scan_requests_rx,
                     path_prefixes_to_scan_rx,
                     next_entry_id,
-                    state: Mutex::new(BackgroundScannerState {
+                    state: Arc::new(Mutex::new(BackgroundScannerState {
                         prev_snapshot: snapshot.snapshot.clone(),
                         snapshot,
                         scanned_dirs: Default::default(),
@@ -1344,8 +1346,9 @@ impl LocalWorktree {
                         paths_to_scan: Default::default(),
                         removed_entries: Default::default(),
                         changed_paths: Default::default(),
+                        repository_scans: HashMap::default(),
                         git_hosting_provider_registry,
-                    }),
+                    })),
                     phase: BackgroundScannerPhase::InitialScan,
                     share_private_files,
                     settings,
@@ -4083,7 +4086,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
 }
 
 struct BackgroundScanner {
-    state: Mutex<BackgroundScannerState>,
+    state: Arc<Mutex<BackgroundScannerState>>,
     fs: Arc<dyn Fs>,
     fs_case_sensitive: bool,
     status_updates_tx: UnboundedSender<ScanState>,
@@ -4097,7 +4100,7 @@ struct BackgroundScanner {
     share_private_files: bool,
 }
 
-#[derive(PartialEq)]
+#[derive(Copy, Clone, PartialEq)]
 enum BackgroundScannerPhase {
     InitialScan,
     EventsReceivedDuringInitialScan,
@@ -4106,8 +4109,6 @@ enum BackgroundScannerPhase {
 
 impl BackgroundScanner {
     async fn run(&mut self, mut fs_events_rx: Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>) {
-        use futures::FutureExt as _;
-
         // If the worktree root does not contain a git repository, then find
         // the git repository in an ancestor directory. Find any gitignore files
         // in ancestor directories.
@@ -4418,22 +4419,33 @@ impl BackgroundScanner {
         self.update_ignore_statuses(scan_job_tx).await;
         self.scan_dirs(false, scan_job_rx).await;
 
-        if !dot_git_abs_paths.is_empty() {
-            self.update_git_repositories(dot_git_abs_paths).await;
-        }
-
-        {
-            let mut state = self.state.lock();
-            state.snapshot.completed_scan_id = state.snapshot.scan_id;
-            for (_, entry) in mem::take(&mut state.removed_entries) {
-                state.scanned_dirs.remove(&entry.id);
-            }
-        }
+        let status_update = if !dot_git_abs_paths.is_empty() {
+            Some(self.schedule_git_repositories_update(dot_git_abs_paths))
+        } else {
+            None
+        };
 
-        #[cfg(test)]
-        self.state.lock().snapshot.check_git_invariants();
+        let phase = self.phase;
+        let status_update_tx = self.status_updates_tx.clone();
+        let state = self.state.clone();
+        self.executor
+            .spawn(async move {
+                if let Some(status_update) = status_update {
+                    status_update.await;
+                }
 
-        self.send_status_update(false, SmallVec::new());
+                {
+                    let mut state = state.lock();
+                    state.snapshot.completed_scan_id = state.snapshot.scan_id;
+                    for (_, entry) in mem::take(&mut state.removed_entries) {
+                        state.scanned_dirs.remove(&entry.id);
+                    }
+                    #[cfg(test)]
+                    state.snapshot.check_git_invariants();
+                }
+                send_status_update_inner(phase, state, status_update_tx, false, SmallVec::new());
+            })
+            .detach();
     }
 
     async fn forcibly_load_paths(&self, paths: &[Arc<Path>]) -> bool {
@@ -4467,8 +4479,6 @@ impl BackgroundScanner {
         enable_progress_updates: bool,
         scan_jobs_rx: channel::Receiver<ScanJob>,
     ) {
-        use futures::FutureExt as _;
-
         if self
             .status_updates_tx
             .unbounded_send(ScanState::Started)
@@ -4536,24 +4546,13 @@ impl BackgroundScanner {
     }
 
     fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool {
-        let mut state = self.state.lock();
-        if state.changed_paths.is_empty() && scanning {
-            return true;
-        }
-
-        let new_snapshot = state.snapshot.clone();
-        let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
-        let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths);
-        state.changed_paths.clear();
-
-        self.status_updates_tx
-            .unbounded_send(ScanState::Updated {
-                snapshot: new_snapshot,
-                changes,
-                scanning,
-                barrier,
-            })
-            .is_ok()
+        send_status_update_inner(
+            self.phase,
+            self.state.clone(),
+            self.status_updates_tx.clone(),
+            scanning,
+            barrier,
+        )
     }
 
     async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
@@ -4609,9 +4608,7 @@ impl BackgroundScanner {
                 );
 
                 if let Some(local_repo) = repo {
-                    self.update_git_statuses(UpdateGitStatusesJob {
-                        local_repository: local_repo,
-                    });
+                    let _ = self.schedule_git_statuses_update(local_repo);
                 }
             } else if child_name == *GITIGNORE {
                 match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
@@ -4968,8 +4965,6 @@ impl BackgroundScanner {
     }
 
     async fn update_ignore_statuses(&self, scan_job_tx: Sender<ScanJob>) {
-        use futures::FutureExt as _;
-
         let mut ignores_to_update = Vec::new();
         let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
         let prev_snapshot;
@@ -5119,10 +5114,10 @@ impl BackgroundScanner {
         state.snapshot.entries_by_id.edit(entries_by_id_edits, &());
     }
 
-    async fn update_git_repositories(&self, dot_git_paths: Vec<PathBuf>) {
+    fn schedule_git_repositories_update(&self, dot_git_paths: Vec<PathBuf>) -> Task<()> {
         log::debug!("reloading repositories: {dot_git_paths:?}");
 
-        let mut repo_updates = Vec::new();
+        let mut repos_to_update = Vec::new();
         {
             let mut state = self.state.lock();
             let scan_id = state.snapshot.scan_id;
@@ -5182,7 +5177,7 @@ impl BackgroundScanner {
                     }
                 };
 
-                repo_updates.push(UpdateGitStatusesJob { local_repository });
+                repos_to_update.push(local_repository);
             }
 
             // Remove any git repositories whose .git entry no longer exists.
@@ -5213,223 +5208,98 @@ impl BackgroundScanner {
             });
         }
 
-        let (mut updates_done_tx, mut updates_done_rx) = barrier::channel();
-        self.executor
-            .scoped(|scope| {
-                scope.spawn(async {
-                    for repo_update in repo_updates {
-                        self.update_git_statuses(repo_update);
-                    }
-                    updates_done_tx.blocking_send(()).ok();
-                });
-
-                scope.spawn(async {
-                    loop {
-                        select_biased! {
-                            // Process any path refresh requests before moving on to process
-                            // the queue of git statuses.
-                            request = self.next_scan_request().fuse() => {
-                                let Ok(request) = request else { break };
-                                if !self.process_scan_request(request, true).await {
-                                    return;
-                                }
-                            }
-                            _ = updates_done_rx.recv().fuse() =>  break,
-                        }
-                    }
-                });
-            })
-            .await;
+        let mut status_updates = Vec::new();
+        for local_repository in repos_to_update {
+            status_updates.push(self.schedule_git_statuses_update(local_repository));
+        }
+        self.executor.spawn(async move {
+            let _updates_finished: Vec<Result<(), oneshot::Canceled>> =
+                join_all(status_updates).await;
+        })
     }
 
     /// Update the git statuses for a given batch of entries.
-    fn update_git_statuses(&self, job: UpdateGitStatusesJob) {
-        log::trace!(
-            "updating git statuses for repo {:?}",
-            job.local_repository.work_directory.path
-        );
-        let t0 = Instant::now();
-
-        let Some(statuses) = job
-            .local_repository
-            .repo()
-            .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
-            .log_err()
-        else {
-            return;
-        };
-        log::trace!(
-            "computed git statuses for repo {:?} in {:?}",
-            job.local_repository.work_directory.path,
-            t0.elapsed()
-        );
-
-        let t0 = Instant::now();
-        let mut changed_paths = Vec::new();
-        let snapshot = self.state.lock().snapshot.snapshot.clone();
-
-        let Some(mut repository) =
-            snapshot.repository(job.local_repository.work_directory.path_key())
-        else {
-            log::error!("Got an UpdateGitStatusesJob for a repository that isn't in the snapshot");
-            debug_assert!(false);
-            return;
-        };
-
-        let mut new_entries_by_path = SumTree::new(&());
-        for (repo_path, status) in statuses.entries.iter() {
-            let project_path = repository.work_directory.unrelativize(repo_path);
-
-            new_entries_by_path.insert_or_replace(
-                StatusEntry {
-                    repo_path: repo_path.clone(),
-                    status: *status,
-                },
-                &(),
-            );
+    fn schedule_git_statuses_update(
+        &self,
+        local_repository: LocalRepositoryEntry,
+    ) -> oneshot::Receiver<()> {
+        let repository_path = local_repository.work_directory.path.clone();
+        let state = self.state.clone();
+        let (tx, rx) = oneshot::channel();
 
-            if let Some(path) = project_path {
-                changed_paths.push(path);
-            }
-        }
+        self.state.lock().repository_scans.insert(
+            repository_path.clone(),
+            self.executor.spawn(async move {
+                log::trace!("updating git statuses for repo {repository_path:?}",);
+                let t0 = Instant::now();
+
+                let Some(statuses) = local_repository
+                    .repo()
+                    .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
+                    .log_err()
+                else {
+                    return;
+                };
+                log::trace!(
+                    "computed git statuses for repo {:?} in {:?}",
+                    repository_path,
+                    t0.elapsed()
+                );
 
-        repository.statuses_by_path = new_entries_by_path;
-        let mut state = self.state.lock();
-        state
-            .snapshot
-            .repositories
-            .insert_or_replace(repository, &());
+                let t0 = Instant::now();
+                let mut changed_paths = Vec::new();
+                let snapshot = state.lock().snapshot.snapshot.clone();
 
-        util::extend_sorted(
-            &mut state.changed_paths,
-            changed_paths,
-            usize::MAX,
-            Ord::cmp,
-        );
+                let Some(mut repository) =
+                    snapshot.repository(local_repository.work_directory.path_key())
+                else {
+                    log::error!(
+                        "Tried to update git statuses for a repository that isn't in the snapshot"
+                    );
+                    debug_assert!(false);
+                    return;
+                };
 
-        log::trace!(
-            "applied git status updates for repo {:?} in {:?}",
-            job.local_repository.work_directory.path,
-            t0.elapsed(),
-        );
-    }
+                let mut new_entries_by_path = SumTree::new(&());
+                for (repo_path, status) in statuses.entries.iter() {
+                    let project_path = repository.work_directory.unrelativize(repo_path);
 
-    fn build_change_set(
-        &self,
-        old_snapshot: &Snapshot,
-        new_snapshot: &Snapshot,
-        event_paths: &[Arc<Path>],
-    ) -> UpdatedEntriesSet {
-        use BackgroundScannerPhase::*;
-        use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
-
-        // Identify which paths have changed. Use the known set of changed
-        // parent paths to optimize the search.
-        let mut changes = Vec::new();
-        let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(&());
-        let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(&());
-        let mut last_newly_loaded_dir_path = None;
-        old_paths.next(&());
-        new_paths.next(&());
-        for path in event_paths {
-            let path = PathKey(path.clone());
-            if old_paths.item().map_or(false, |e| e.path < path.0) {
-                old_paths.seek_forward(&path, Bias::Left, &());
-            }
-            if new_paths.item().map_or(false, |e| e.path < path.0) {
-                new_paths.seek_forward(&path, Bias::Left, &());
-            }
-            loop {
-                match (old_paths.item(), new_paths.item()) {
-                    (Some(old_entry), Some(new_entry)) => {
-                        if old_entry.path > path.0
-                            && new_entry.path > path.0
-                            && !old_entry.path.starts_with(&path.0)
-                            && !new_entry.path.starts_with(&path.0)
-                        {
-                            break;
-                        }
+                    new_entries_by_path.insert_or_replace(
+                        StatusEntry {
+                            repo_path: repo_path.clone(),
+                            status: *status,
+                        },
+                        &(),
+                    );
 
-                        match Ord::cmp(&old_entry.path, &new_entry.path) {
-                            Ordering::Less => {
-                                changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                                old_paths.next(&());
-                            }
-                            Ordering::Equal => {
-                                if self.phase == EventsReceivedDuringInitialScan {
-                                    if old_entry.id != new_entry.id {
-                                        changes.push((
-                                            old_entry.path.clone(),
-                                            old_entry.id,
-                                            Removed,
-                                        ));
-                                    }
-                                    // If the worktree was not fully initialized when this event was generated,
-                                    // we can't know whether this entry was added during the scan or whether
-                                    // it was merely updated.
-                                    changes.push((
-                                        new_entry.path.clone(),
-                                        new_entry.id,
-                                        AddedOrUpdated,
-                                    ));
-                                } else if old_entry.id != new_entry.id {
-                                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                                    changes.push((new_entry.path.clone(), new_entry.id, Added));
-                                } else if old_entry != new_entry {
-                                    if old_entry.kind.is_unloaded() {
-                                        last_newly_loaded_dir_path = Some(&new_entry.path);
-                                        changes.push((
-                                            new_entry.path.clone(),
-                                            new_entry.id,
-                                            Loaded,
-                                        ));
-                                    } else {
-                                        changes.push((
-                                            new_entry.path.clone(),
-                                            new_entry.id,
-                                            Updated,
-                                        ));
-                                    }
-                                }
-                                old_paths.next(&());
-                                new_paths.next(&());
-                            }
-                            Ordering::Greater => {
-                                let is_newly_loaded = self.phase == InitialScan
-                                    || last_newly_loaded_dir_path
-                                        .as_ref()
-                                        .map_or(false, |dir| new_entry.path.starts_with(dir));
-                                changes.push((
-                                    new_entry.path.clone(),
-                                    new_entry.id,
-                                    if is_newly_loaded { Loaded } else { Added },
-                                ));
-                                new_paths.next(&());
-                            }
-                        }
-                    }
-                    (Some(old_entry), None) => {
-                        changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                        old_paths.next(&());
-                    }
-                    (None, Some(new_entry)) => {
-                        let is_newly_loaded = self.phase == InitialScan
-                            || last_newly_loaded_dir_path
-                                .as_ref()
-                                .map_or(false, |dir| new_entry.path.starts_with(dir));
-                        changes.push((
-                            new_entry.path.clone(),
-                            new_entry.id,
-                            if is_newly_loaded { Loaded } else { Added },
-                        ));
-                        new_paths.next(&());
+                    if let Some(path) = project_path {
+                        changed_paths.push(path);
                     }
-                    (None, None) => break,
                 }
-            }
-        }
 
-        changes.into()
+                repository.statuses_by_path = new_entries_by_path;
+                let mut state = state.lock();
+                state
+                    .snapshot
+                    .repositories
+                    .insert_or_replace(repository, &());
+
+                util::extend_sorted(
+                    &mut state.changed_paths,
+                    changed_paths,
+                    usize::MAX,
+                    Ord::cmp,
+                );
+
+                log::trace!(
+                    "applied git status updates for repo {:?} in {:?}",
+                    repository_path,
+                    t0.elapsed(),
+                );
+                tx.send(()).ok();
+            }),
+        );
+        rx
     }
 
     async fn progress_timer(&self, running: bool) {
@@ -5459,6 +5329,139 @@ impl BackgroundScanner {
     }
 }
 
+fn send_status_update_inner(
+    phase: BackgroundScannerPhase,
+    state: Arc<Mutex<BackgroundScannerState>>,
+    status_updates_tx: UnboundedSender<ScanState>,
+    scanning: bool,
+    barrier: SmallVec<[barrier::Sender; 1]>,
+) -> bool {
+    let mut state = state.lock();
+    if state.changed_paths.is_empty() && scanning {
+        return true;
+    }
+
+    let new_snapshot = state.snapshot.clone();
+    let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
+    let changes = build_change_set(phase, &old_snapshot, &new_snapshot, &state.changed_paths);
+    state.changed_paths.clear();
+
+    status_updates_tx
+        .unbounded_send(ScanState::Updated {
+            snapshot: new_snapshot,
+            changes,
+            scanning,
+            barrier,
+        })
+        .is_ok()
+}
+
+fn build_change_set(
+    phase: BackgroundScannerPhase,
+    old_snapshot: &Snapshot,
+    new_snapshot: &Snapshot,
+    event_paths: &[Arc<Path>],
+) -> UpdatedEntriesSet {
+    use BackgroundScannerPhase::*;
+    use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
+
+    // Identify which paths have changed. Use the known set of changed
+    // parent paths to optimize the search.
+    let mut changes = Vec::new();
+    let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(&());
+    let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(&());
+    let mut last_newly_loaded_dir_path = None;
+    old_paths.next(&());
+    new_paths.next(&());
+    for path in event_paths {
+        let path = PathKey(path.clone());
+        if old_paths.item().map_or(false, |e| e.path < path.0) {
+            old_paths.seek_forward(&path, Bias::Left, &());
+        }
+        if new_paths.item().map_or(false, |e| e.path < path.0) {
+            new_paths.seek_forward(&path, Bias::Left, &());
+        }
+        loop {
+            match (old_paths.item(), new_paths.item()) {
+                (Some(old_entry), Some(new_entry)) => {
+                    if old_entry.path > path.0
+                        && new_entry.path > path.0
+                        && !old_entry.path.starts_with(&path.0)
+                        && !new_entry.path.starts_with(&path.0)
+                    {
+                        break;
+                    }
+
+                    match Ord::cmp(&old_entry.path, &new_entry.path) {
+                        Ordering::Less => {
+                            changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                            old_paths.next(&());
+                        }
+                        Ordering::Equal => {
+                            if phase == EventsReceivedDuringInitialScan {
+                                if old_entry.id != new_entry.id {
+                                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                                }
+                                // If the worktree was not fully initialized when this event was generated,
+                                // we can't know whether this entry was added during the scan or whether
+                                // it was merely updated.
+                                changes.push((
+                                    new_entry.path.clone(),
+                                    new_entry.id,
+                                    AddedOrUpdated,
+                                ));
+                            } else if old_entry.id != new_entry.id {
+                                changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                                changes.push((new_entry.path.clone(), new_entry.id, Added));
+                            } else if old_entry != new_entry {
+                                if old_entry.kind.is_unloaded() {
+                                    last_newly_loaded_dir_path = Some(&new_entry.path);
+                                    changes.push((new_entry.path.clone(), new_entry.id, Loaded));
+                                } else {
+                                    changes.push((new_entry.path.clone(), new_entry.id, Updated));
+                                }
+                            }
+                            old_paths.next(&());
+                            new_paths.next(&());
+                        }
+                        Ordering::Greater => {
+                            let is_newly_loaded = phase == InitialScan
+                                || last_newly_loaded_dir_path
+                                    .as_ref()
+                                    .map_or(false, |dir| new_entry.path.starts_with(dir));
+                            changes.push((
+                                new_entry.path.clone(),
+                                new_entry.id,
+                                if is_newly_loaded { Loaded } else { Added },
+                            ));
+                            new_paths.next(&());
+                        }
+                    }
+                }
+                (Some(old_entry), None) => {
+                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                    old_paths.next(&());
+                }
+                (None, Some(new_entry)) => {
+                    let is_newly_loaded = phase == InitialScan
+                        || last_newly_loaded_dir_path
+                            .as_ref()
+                            .map_or(false, |dir| new_entry.path.starts_with(dir));
+                    changes.push((
+                        new_entry.path.clone(),
+                        new_entry.id,
+                        if is_newly_loaded { Loaded } else { Added },
+                    ));
+                    new_paths.next(&());
+                }
+                (None, None) => break,
+            }
+        }
+    }
+
+    changes.into()
+}
+
 fn swap_to_front(child_paths: &mut Vec<PathBuf>, file: &OsStr) {
     let position = child_paths
         .iter()
@@ -5521,10 +5524,6 @@ struct UpdateIgnoreStatusJob {
     scan_queue: Sender<ScanJob>,
 }
 
-struct UpdateGitStatusesJob {
-    local_repository: LocalRepositoryEntry,
-}
-
 pub trait WorktreeModelHandle {
     #[cfg(any(test, feature = "test-support"))]
     fn flush_fs_events<'a>(

crates/worktree/src/worktree_tests.rs 🔗

@@ -24,6 +24,7 @@ use std::{
     mem,
     path::{Path, PathBuf},
     sync::Arc,
+    time::Duration,
 };
 use util::{test::TempTree, ResultExt};
 
@@ -1504,6 +1505,7 @@ async fn test_bump_mtime_of_git_repo_workdir(cx: &mut TestAppContext) {
         &[(Path::new("b/c.txt"), StatusCode::Modified.index())],
     );
     cx.executor().run_until_parked();
+    cx.executor().advance_clock(Duration::from_secs(1));
 
     let snapshot = tree.read_with(cx, |tree, _| tree.snapshot());