Revert "Move git status updates to a background thread (#24307)" (#24415)

Cole Miller and Conrad Irwin created

This reverts commit 980ce5fbf2d0de7e954c32dd982268d3b58dfccc.

Release Notes:

- N/A

---------

Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>

Change summary

crates/worktree/src/worktree.rs       | 535 ++++++++++++++--------------
crates/worktree/src/worktree_tests.rs |   2 
2 files changed, 270 insertions(+), 267 deletions(-)

Detailed changes

crates/worktree/src/worktree.rs 🔗

@@ -13,7 +13,6 @@ use futures::{
         mpsc::{self, UnboundedSender},
         oneshot,
     },
-    future::join_all,
     select_biased,
     task::Poll,
     FutureExt as _, Stream, StreamExt,
@@ -466,7 +465,6 @@ struct BackgroundScannerState {
     changed_paths: Vec<Arc<Path>>,
     prev_snapshot: Snapshot,
     git_hosting_provider_registry: Option<Arc<GitHostingProviderRegistry>>,
-    repository_scans: HashMap<Arc<Path>, Task<()>>,
 }
 
 #[derive(Debug, Clone)]
@@ -1355,7 +1353,7 @@ impl LocalWorktree {
                     scan_requests_rx,
                     path_prefixes_to_scan_rx,
                     next_entry_id,
-                    state: Arc::new(Mutex::new(BackgroundScannerState {
+                    state: Mutex::new(BackgroundScannerState {
                         prev_snapshot: snapshot.snapshot.clone(),
                         snapshot,
                         scanned_dirs: Default::default(),
@@ -1363,9 +1361,8 @@ impl LocalWorktree {
                         paths_to_scan: Default::default(),
                         removed_entries: Default::default(),
                         changed_paths: Default::default(),
-                        repository_scans: HashMap::default(),
                         git_hosting_provider_registry,
-                    })),
+                    }),
                     phase: BackgroundScannerPhase::InitialScan,
                     share_private_files,
                     settings,
@@ -4111,7 +4108,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
 }
 
 struct BackgroundScanner {
-    state: Arc<Mutex<BackgroundScannerState>>,
+    state: Mutex<BackgroundScannerState>,
     fs: Arc<dyn Fs>,
     fs_case_sensitive: bool,
     status_updates_tx: UnboundedSender<ScanState>,
@@ -4125,7 +4122,7 @@ struct BackgroundScanner {
     share_private_files: bool,
 }
 
-#[derive(Copy, Clone, PartialEq)]
+#[derive(PartialEq)]
 enum BackgroundScannerPhase {
     InitialScan,
     EventsReceivedDuringInitialScan,
@@ -4134,6 +4131,8 @@ enum BackgroundScannerPhase {
 
 impl BackgroundScanner {
     async fn run(&mut self, mut fs_events_rx: Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>) {
+        use futures::FutureExt as _;
+
         // If the worktree root does not contain a git repository, then find
         // the git repository in an ancestor directory. Find any gitignore files
         // in ancestor directories.
@@ -4444,33 +4443,22 @@ impl BackgroundScanner {
         self.update_ignore_statuses(scan_job_tx).await;
         self.scan_dirs(false, scan_job_rx).await;
 
-        let status_update = if !dot_git_abs_paths.is_empty() {
-            Some(self.schedule_git_repositories_update(dot_git_abs_paths))
-        } else {
-            None
-        };
+        if !dot_git_abs_paths.is_empty() {
+            self.update_git_repositories(dot_git_abs_paths).await;
+        }
 
-        let phase = self.phase;
-        let status_update_tx = self.status_updates_tx.clone();
-        let state = self.state.clone();
-        self.executor
-            .spawn(async move {
-                if let Some(status_update) = status_update {
-                    status_update.await;
-                }
+        {
+            let mut state = self.state.lock();
+            state.snapshot.completed_scan_id = state.snapshot.scan_id;
+            for (_, entry) in mem::take(&mut state.removed_entries) {
+                state.scanned_dirs.remove(&entry.id);
+            }
+        }
 
-                {
-                    let mut state = state.lock();
-                    state.snapshot.completed_scan_id = state.snapshot.scan_id;
-                    for (_, entry) in mem::take(&mut state.removed_entries) {
-                        state.scanned_dirs.remove(&entry.id);
-                    }
-                    #[cfg(test)]
-                    state.snapshot.check_git_invariants();
-                }
-                send_status_update_inner(phase, state, status_update_tx, false, SmallVec::new());
-            })
-            .detach();
+        #[cfg(test)]
+        self.state.lock().snapshot.check_git_invariants();
+
+        self.send_status_update(false, SmallVec::new());
     }
 
     async fn forcibly_load_paths(&self, paths: &[Arc<Path>]) -> bool {
@@ -4504,6 +4492,8 @@ impl BackgroundScanner {
         enable_progress_updates: bool,
         scan_jobs_rx: channel::Receiver<ScanJob>,
     ) {
+        use futures::FutureExt as _;
+
         if self
             .status_updates_tx
             .unbounded_send(ScanState::Started)
@@ -4571,13 +4561,24 @@ impl BackgroundScanner {
     }
 
     fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool {
-        send_status_update_inner(
-            self.phase,
-            self.state.clone(),
-            self.status_updates_tx.clone(),
-            scanning,
-            barrier,
-        )
+        let mut state = self.state.lock();
+        if state.changed_paths.is_empty() && scanning {
+            return true;
+        }
+
+        let new_snapshot = state.snapshot.clone();
+        let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
+        let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths);
+        state.changed_paths.clear();
+
+        self.status_updates_tx
+            .unbounded_send(ScanState::Updated {
+                snapshot: new_snapshot,
+                changes,
+                scanning,
+                barrier,
+            })
+            .is_ok()
     }
 
     async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
@@ -4633,7 +4634,9 @@ impl BackgroundScanner {
                 );
 
                 if let Some(local_repo) = repo {
-                    let _ = self.schedule_git_statuses_update(local_repo);
+                    self.update_git_statuses(UpdateGitStatusesJob {
+                        local_repository: local_repo,
+                    });
                 }
             } else if child_name == *GITIGNORE {
                 match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
@@ -4990,6 +4993,8 @@ impl BackgroundScanner {
     }
 
     async fn update_ignore_statuses(&self, scan_job_tx: Sender<ScanJob>) {
+        use futures::FutureExt as _;
+
         let mut ignores_to_update = Vec::new();
         let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
         let prev_snapshot;
@@ -5139,10 +5144,10 @@ impl BackgroundScanner {
         state.snapshot.entries_by_id.edit(entries_by_id_edits, &());
     }
 
-    fn schedule_git_repositories_update(&self, dot_git_paths: Vec<PathBuf>) -> Task<()> {
+    async fn update_git_repositories(&self, dot_git_paths: Vec<PathBuf>) {
         log::debug!("reloading repositories: {dot_git_paths:?}");
 
-        let mut repos_to_update = Vec::new();
+        let mut repo_updates = Vec::new();
         {
             let mut state = self.state.lock();
             let scan_id = state.snapshot.scan_id;
@@ -5205,7 +5210,7 @@ impl BackgroundScanner {
                     }
                 };
 
-                repos_to_update.push(local_repository);
+                repo_updates.push(UpdateGitStatusesJob { local_repository });
             }
 
             // Remove any git repositories whose .git entry no longer exists.
@@ -5236,109 +5241,238 @@ impl BackgroundScanner {
             });
         }
 
-        let mut status_updates = Vec::new();
-        for local_repository in repos_to_update {
-            status_updates.push(self.schedule_git_statuses_update(local_repository));
-        }
-        self.executor.spawn(async move {
-            let _updates_finished: Vec<Result<(), oneshot::Canceled>> =
-                join_all(status_updates).await;
-        })
+        let (mut updates_done_tx, mut updates_done_rx) = barrier::channel();
+        self.executor
+            .scoped(|scope| {
+                scope.spawn(async {
+                    for repo_update in repo_updates {
+                        self.update_git_statuses(repo_update);
+                    }
+                    updates_done_tx.blocking_send(()).ok();
+                });
+
+                scope.spawn(async {
+                    loop {
+                        select_biased! {
+                            // Process any path refresh requests before moving on to process
+                            // the queue of git statuses.
+                            request = self.next_scan_request().fuse() => {
+                                let Ok(request) = request else { break };
+                                if !self.process_scan_request(request, true).await {
+                                    return;
+                                }
+                            }
+                            _ = updates_done_rx.recv().fuse() =>  break,
+                        }
+                    }
+                });
+            })
+            .await;
     }
 
     /// Update the git statuses for a given batch of entries.
-    fn schedule_git_statuses_update(
-        &self,
-        local_repository: LocalRepositoryEntry,
-    ) -> oneshot::Receiver<()> {
-        let repository_path = local_repository.work_directory.path.clone();
-        let state = self.state.clone();
-        let (tx, rx) = oneshot::channel();
+    fn update_git_statuses(&self, job: UpdateGitStatusesJob) {
+        log::trace!(
+            "updating git statuses for repo {:?}",
+            job.local_repository.work_directory.path
+        );
+        let t0 = Instant::now();
 
-        self.state.lock().repository_scans.insert(
-            repository_path.clone(),
-            self.executor.spawn(async move {
-                log::trace!("updating git statuses for repo {repository_path:?}",);
-                let t0 = Instant::now();
-
-                let Some(statuses) = local_repository
-                    .repo()
-                    .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
-                    .log_err()
-                else {
-                    return;
-                };
-                log::trace!(
-                    "computed git statuses for repo {:?} in {:?}",
-                    repository_path,
-                    t0.elapsed()
-                );
+        let Some(statuses) = job
+            .local_repository
+            .repo()
+            .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
+            .log_err()
+        else {
+            return;
+        };
+        log::trace!(
+            "computed git statuses for repo {:?} in {:?}",
+            job.local_repository.work_directory.path,
+            t0.elapsed()
+        );
 
-                let t0 = Instant::now();
-                let mut changed_paths = Vec::new();
-                let snapshot = state.lock().snapshot.snapshot.clone();
+        let t0 = Instant::now();
+        let mut changed_paths = Vec::new();
+        let snapshot = self.state.lock().snapshot.snapshot.clone();
+
+        let Some(mut repository) =
+            snapshot.repository(job.local_repository.work_directory.path_key())
+        else {
+            log::error!("Got an UpdateGitStatusesJob for a repository that isn't in the snapshot");
+            debug_assert!(false);
+            return;
+        };
 
-                let Some(mut repository) =
-                    snapshot.repository(local_repository.work_directory.path_key())
-                else {
-                    log::error!(
-                        "Tried to update git statuses for a repository that isn't in the snapshot"
-                    );
-                    debug_assert!(false);
-                    return;
-                };
+        let merge_head_shas = job.local_repository.repo().merge_head_shas();
+        if merge_head_shas != job.local_repository.current_merge_head_shas {
+            mem::take(&mut repository.current_merge_conflicts);
+        }
 
-                let merge_head_shas = local_repository.repo().merge_head_shas();
-                if merge_head_shas != local_repository.current_merge_head_shas {
-                    mem::take(&mut repository.current_merge_conflicts);
-                }
+        let mut new_entries_by_path = SumTree::new(&());
+        for (repo_path, status) in statuses.entries.iter() {
+            let project_path = repository.work_directory.unrelativize(repo_path);
 
-                let mut new_entries_by_path = SumTree::new(&());
-                for (repo_path, status) in statuses.entries.iter() {
-                    let project_path = repository.work_directory.unrelativize(repo_path);
+            new_entries_by_path.insert_or_replace(
+                StatusEntry {
+                    repo_path: repo_path.clone(),
+                    status: *status,
+                },
+                &(),
+            );
+            if status.is_conflicted() {
+                repository.current_merge_conflicts.insert(repo_path.clone());
+            }
 
-                    new_entries_by_path.insert_or_replace(
-                        StatusEntry {
-                            repo_path: repo_path.clone(),
-                            status: *status,
-                        },
-                        &(),
-                    );
+            if let Some(path) = project_path {
+                changed_paths.push(path);
+            }
+        }
 
-                    if let Some(path) = project_path {
-                        changed_paths.push(path);
-                    }
-                }
+        repository.statuses_by_path = new_entries_by_path;
+        let mut state = self.state.lock();
+        state
+            .snapshot
+            .repositories
+            .insert_or_replace(repository, &());
 
-                repository.statuses_by_path = new_entries_by_path;
-                let mut state = state.lock();
-                state
-                    .snapshot
-                    .repositories
-                    .insert_or_replace(repository, &());
-                state.snapshot.git_repositories.update(
-                    &local_repository.work_directory_id,
-                    |entry| {
-                        entry.current_merge_head_shas = merge_head_shas;
-                    },
-                );
+        state
+            .snapshot
+            .git_repositories
+            .update(&job.local_repository.work_directory_id, |entry| {
+                entry.current_merge_head_shas = merge_head_shas;
+            });
 
-                util::extend_sorted(
-                    &mut state.changed_paths,
-                    changed_paths,
-                    usize::MAX,
-                    Ord::cmp,
-                );
+        util::extend_sorted(
+            &mut state.changed_paths,
+            changed_paths,
+            usize::MAX,
+            Ord::cmp,
+        );
 
-                log::trace!(
-                    "applied git status updates for repo {:?} in {:?}",
-                    repository_path,
-                    t0.elapsed(),
-                );
-                tx.send(()).ok();
-            }),
+        log::trace!(
+            "applied git status updates for repo {:?} in {:?}",
+            job.local_repository.work_directory.path,
+            t0.elapsed(),
         );
-        rx
+    }
+
+    fn build_change_set(
+        &self,
+        old_snapshot: &Snapshot,
+        new_snapshot: &Snapshot,
+        event_paths: &[Arc<Path>],
+    ) -> UpdatedEntriesSet {
+        use BackgroundScannerPhase::*;
+        use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
+
+        // Identify which paths have changed. Use the known set of changed
+        // parent paths to optimize the search.
+        let mut changes = Vec::new();
+        let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(&());
+        let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(&());
+        let mut last_newly_loaded_dir_path = None;
+        old_paths.next(&());
+        new_paths.next(&());
+        for path in event_paths {
+            let path = PathKey(path.clone());
+            if old_paths.item().map_or(false, |e| e.path < path.0) {
+                old_paths.seek_forward(&path, Bias::Left, &());
+            }
+            if new_paths.item().map_or(false, |e| e.path < path.0) {
+                new_paths.seek_forward(&path, Bias::Left, &());
+            }
+            loop {
+                match (old_paths.item(), new_paths.item()) {
+                    (Some(old_entry), Some(new_entry)) => {
+                        if old_entry.path > path.0
+                            && new_entry.path > path.0
+                            && !old_entry.path.starts_with(&path.0)
+                            && !new_entry.path.starts_with(&path.0)
+                        {
+                            break;
+                        }
+
+                        match Ord::cmp(&old_entry.path, &new_entry.path) {
+                            Ordering::Less => {
+                                changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                                old_paths.next(&());
+                            }
+                            Ordering::Equal => {
+                                if self.phase == EventsReceivedDuringInitialScan {
+                                    if old_entry.id != new_entry.id {
+                                        changes.push((
+                                            old_entry.path.clone(),
+                                            old_entry.id,
+                                            Removed,
+                                        ));
+                                    }
+                                    // If the worktree was not fully initialized when this event was generated,
+                                    // we can't know whether this entry was added during the scan or whether
+                                    // it was merely updated.
+                                    changes.push((
+                                        new_entry.path.clone(),
+                                        new_entry.id,
+                                        AddedOrUpdated,
+                                    ));
+                                } else if old_entry.id != new_entry.id {
+                                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                                    changes.push((new_entry.path.clone(), new_entry.id, Added));
+                                } else if old_entry != new_entry {
+                                    if old_entry.kind.is_unloaded() {
+                                        last_newly_loaded_dir_path = Some(&new_entry.path);
+                                        changes.push((
+                                            new_entry.path.clone(),
+                                            new_entry.id,
+                                            Loaded,
+                                        ));
+                                    } else {
+                                        changes.push((
+                                            new_entry.path.clone(),
+                                            new_entry.id,
+                                            Updated,
+                                        ));
+                                    }
+                                }
+                                old_paths.next(&());
+                                new_paths.next(&());
+                            }
+                            Ordering::Greater => {
+                                let is_newly_loaded = self.phase == InitialScan
+                                    || last_newly_loaded_dir_path
+                                        .as_ref()
+                                        .map_or(false, |dir| new_entry.path.starts_with(dir));
+                                changes.push((
+                                    new_entry.path.clone(),
+                                    new_entry.id,
+                                    if is_newly_loaded { Loaded } else { Added },
+                                ));
+                                new_paths.next(&());
+                            }
+                        }
+                    }
+                    (Some(old_entry), None) => {
+                        changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                        old_paths.next(&());
+                    }
+                    (None, Some(new_entry)) => {
+                        let is_newly_loaded = self.phase == InitialScan
+                            || last_newly_loaded_dir_path
+                                .as_ref()
+                                .map_or(false, |dir| new_entry.path.starts_with(dir));
+                        changes.push((
+                            new_entry.path.clone(),
+                            new_entry.id,
+                            if is_newly_loaded { Loaded } else { Added },
+                        ));
+                        new_paths.next(&());
+                    }
+                    (None, None) => break,
+                }
+            }
+        }
+
+        changes.into()
     }
 
     async fn progress_timer(&self, running: bool) {
@@ -5368,139 +5502,6 @@ impl BackgroundScanner {
     }
 }
 
-fn send_status_update_inner(
-    phase: BackgroundScannerPhase,
-    state: Arc<Mutex<BackgroundScannerState>>,
-    status_updates_tx: UnboundedSender<ScanState>,
-    scanning: bool,
-    barrier: SmallVec<[barrier::Sender; 1]>,
-) -> bool {
-    let mut state = state.lock();
-    if state.changed_paths.is_empty() && scanning {
-        return true;
-    }
-
-    let new_snapshot = state.snapshot.clone();
-    let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
-    let changes = build_diff(phase, &old_snapshot, &new_snapshot, &state.changed_paths);
-    state.changed_paths.clear();
-
-    status_updates_tx
-        .unbounded_send(ScanState::Updated {
-            snapshot: new_snapshot,
-            changes,
-            scanning,
-            barrier,
-        })
-        .is_ok()
-}
-
-fn build_diff(
-    phase: BackgroundScannerPhase,
-    old_snapshot: &Snapshot,
-    new_snapshot: &Snapshot,
-    event_paths: &[Arc<Path>],
-) -> UpdatedEntriesSet {
-    use BackgroundScannerPhase::*;
-    use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
-
-    // Identify which paths have changed. Use the known set of changed
-    // parent paths to optimize the search.
-    let mut changes = Vec::new();
-    let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(&());
-    let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(&());
-    let mut last_newly_loaded_dir_path = None;
-    old_paths.next(&());
-    new_paths.next(&());
-    for path in event_paths {
-        let path = PathKey(path.clone());
-        if old_paths.item().map_or(false, |e| e.path < path.0) {
-            old_paths.seek_forward(&path, Bias::Left, &());
-        }
-        if new_paths.item().map_or(false, |e| e.path < path.0) {
-            new_paths.seek_forward(&path, Bias::Left, &());
-        }
-        loop {
-            match (old_paths.item(), new_paths.item()) {
-                (Some(old_entry), Some(new_entry)) => {
-                    if old_entry.path > path.0
-                        && new_entry.path > path.0
-                        && !old_entry.path.starts_with(&path.0)
-                        && !new_entry.path.starts_with(&path.0)
-                    {
-                        break;
-                    }
-
-                    match Ord::cmp(&old_entry.path, &new_entry.path) {
-                        Ordering::Less => {
-                            changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                            old_paths.next(&());
-                        }
-                        Ordering::Equal => {
-                            if phase == EventsReceivedDuringInitialScan {
-                                if old_entry.id != new_entry.id {
-                                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                                }
-                                // If the worktree was not fully initialized when this event was generated,
-                                // we can't know whether this entry was added during the scan or whether
-                                // it was merely updated.
-                                changes.push((
-                                    new_entry.path.clone(),
-                                    new_entry.id,
-                                    AddedOrUpdated,
-                                ));
-                            } else if old_entry.id != new_entry.id {
-                                changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                                changes.push((new_entry.path.clone(), new_entry.id, Added));
-                            } else if old_entry != new_entry {
-                                if old_entry.kind.is_unloaded() {
-                                    last_newly_loaded_dir_path = Some(&new_entry.path);
-                                    changes.push((new_entry.path.clone(), new_entry.id, Loaded));
-                                } else {
-                                    changes.push((new_entry.path.clone(), new_entry.id, Updated));
-                                }
-                            }
-                            old_paths.next(&());
-                            new_paths.next(&());
-                        }
-                        Ordering::Greater => {
-                            let is_newly_loaded = phase == InitialScan
-                                || last_newly_loaded_dir_path
-                                    .as_ref()
-                                    .map_or(false, |dir| new_entry.path.starts_with(dir));
-                            changes.push((
-                                new_entry.path.clone(),
-                                new_entry.id,
-                                if is_newly_loaded { Loaded } else { Added },
-                            ));
-                            new_paths.next(&());
-                        }
-                    }
-                }
-                (Some(old_entry), None) => {
-                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                    old_paths.next(&());
-                }
-                (None, Some(new_entry)) => {
-                    let is_newly_loaded = phase == InitialScan
-                        || last_newly_loaded_dir_path
-                            .as_ref()
-                            .map_or(false, |dir| new_entry.path.starts_with(dir));
-                    changes.push((
-                        new_entry.path.clone(),
-                        new_entry.id,
-                        if is_newly_loaded { Loaded } else { Added },
-                    ));
-                    new_paths.next(&());
-                }
-                (None, None) => break,
-            }
-        }
-    }
-
-    changes.into()
-}
-
 fn swap_to_front(child_paths: &mut Vec<PathBuf>, file: &OsStr) {
     let position = child_paths
         .iter()
@@ -5563,6 +5564,10 @@ struct UpdateIgnoreStatusJob {
     scan_queue: Sender<ScanJob>,
 }
 
+struct UpdateGitStatusesJob {
+    local_repository: LocalRepositoryEntry,
+}
+
 pub trait WorktreeModelHandle {
     #[cfg(any(test, feature = "test-support"))]
     fn flush_fs_events<'a>(

crates/worktree/src/worktree_tests.rs 🔗

@@ -24,7 +24,6 @@ use std::{
     mem,
     path::{Path, PathBuf},
     sync::Arc,
-    time::Duration,
 };
 use util::{test::TempTree, ResultExt};
 
@@ -1505,7 +1504,6 @@ async fn test_bump_mtime_of_git_repo_workdir(cx: &mut TestAppContext) {
         &[(Path::new("b/c.txt"), StatusCode::Modified.index())],
     );
     cx.executor().run_until_parked();
-    cx.executor().advance_clock(Duration::from_secs(1));
 
     let snapshot = tree.read_with(cx, |tree, _| tree.snapshot());