Move git status updates to a background thread #2 (#24722)

Kirill Bulatov created

Follow-up of https://github.com/zed-industries/zed/pull/24307

Brings back the PR and fixes the issue with the git status not
propagated, if computed too slow.
Now, git repo update
* waits in the background for all `scan_dir` repo status updates and
triggers another status update send afterwards
* ensures that the update sent is reported correctly (`scanning = true`)
if either FS or status scan is running still
* during worktree's git statuses updates, bumps `status_scan_id` to
ensure the repo update is reported to all subscribers

Release Notes:

- Improved project panel's speed in large projects

Change summary

crates/collab/.env.toml               |   1 
crates/workspace/src/workspace.rs     |   1 
crates/worktree/src/worktree.rs       | 674 +++++++++++++++-------------
crates/worktree/src/worktree_tests.rs |   2 
4 files changed, 363 insertions(+), 315 deletions(-)

Detailed changes

crates/collab/.env.toml 🔗

@@ -18,6 +18,7 @@ SEED_PATH = "crates/collab/seed.default.json"
 LLM_DATABASE_URL = "postgres://postgres@localhost/zed_llm"
 LLM_DATABASE_MAX_CONNECTIONS = 5
 LLM_API_SECRET = "llm-secret"
+OPENAI_API_KEY = "llm-secret"
 
 # SLACK_PANICS_WEBHOOK = ""
 

crates/workspace/src/workspace.rs 🔗

@@ -1770,6 +1770,7 @@ impl Workspace {
         self.project.read(cx).visible_worktrees(cx)
     }
 
+    #[cfg(any(test, feature = "test-support"))]
     pub fn worktree_scans_complete(&self, cx: &App) -> impl Future<Output = ()> + 'static {
         let futures = self
             .worktrees(cx)

crates/worktree/src/worktree.rs 🔗

@@ -13,6 +13,7 @@ use futures::{
         mpsc::{self, UnboundedSender},
         oneshot,
     },
+    future::join_all,
     select_biased,
     task::Poll,
     FutureExt as _, Stream, StreamExt,
@@ -59,7 +60,7 @@ use std::{
     path::{Path, PathBuf},
     pin::Pin,
     sync::{
-        atomic::{AtomicUsize, Ordering::SeqCst},
+        atomic::{self, AtomicU32, AtomicUsize, Ordering::SeqCst},
         Arc,
     },
     time::{Duration, Instant},
@@ -559,6 +560,7 @@ struct BackgroundScannerState {
     changed_paths: Vec<Arc<Path>>,
     prev_snapshot: Snapshot,
     git_hosting_provider_registry: Option<Arc<GitHostingProviderRegistry>>,
+    repository_scans: HashMap<PathKey, Task<()>>,
 }
 
 #[derive(Debug, Clone)]
@@ -623,6 +625,7 @@ impl DerefMut for LocalSnapshot {
     }
 }
 
+#[derive(Debug)]
 enum ScanState {
     Started,
     Updated {
@@ -1445,7 +1448,7 @@ impl LocalWorktree {
                     scan_requests_rx,
                     path_prefixes_to_scan_rx,
                     next_entry_id,
-                    state: Mutex::new(BackgroundScannerState {
+                    state: Arc::new(Mutex::new(BackgroundScannerState {
                         prev_snapshot: snapshot.snapshot.clone(),
                         snapshot,
                         scanned_dirs: Default::default(),
@@ -1453,8 +1456,9 @@ impl LocalWorktree {
                         paths_to_scan: Default::default(),
                         removed_entries: Default::default(),
                         changed_paths: Default::default(),
+                        repository_scans: HashMap::default(),
                         git_hosting_provider_registry,
-                    }),
+                    })),
                     phase: BackgroundScannerPhase::InitialScan,
                     share_private_files,
                     settings,
@@ -2227,14 +2231,12 @@ impl LocalWorktree {
         let _maintain_remote_snapshot = cx.background_executor().spawn(async move {
             let mut is_first = true;
             while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await {
-                let update;
-                if is_first {
-                    update = snapshot.build_initial_update(project_id, worktree_id);
+                let update = if is_first {
                     is_first = false;
+                    snapshot.build_initial_update(project_id, worktree_id)
                 } else {
-                    update =
-                        snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes);
-                }
+                    snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes)
+                };
 
                 for update in proto::split_worktree_update(update) {
                     let _ = resume_updates_rx.try_recv();
@@ -2604,7 +2606,7 @@ impl Snapshot {
         mut update: proto::UpdateWorktree,
         always_included_paths: &PathMatcher,
     ) -> Result<()> {
-        log::trace!(
+        log::debug!(
             "applying remote worktree update. {} entries updated, {} removed",
             update.updated_entries.len(),
             update.removed_entries.len()
@@ -4163,6 +4165,11 @@ struct PathEntry {
     scan_id: usize,
 }
 
+#[derive(Debug, Default)]
+struct FsScanned {
+    status_scans: Arc<AtomicU32>,
+}
+
 impl sum_tree::Item for PathEntry {
     type Summary = PathEntrySummary;
 
@@ -4226,7 +4233,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
 }
 
 struct BackgroundScanner {
-    state: Mutex<BackgroundScannerState>,
+    state: Arc<Mutex<BackgroundScannerState>>,
     fs: Arc<dyn Fs>,
     fs_case_sensitive: bool,
     status_updates_tx: UnboundedSender<ScanState>,
@@ -4240,7 +4247,7 @@ struct BackgroundScanner {
     share_private_files: bool,
 }
 
-#[derive(PartialEq)]
+#[derive(Copy, Clone, PartialEq)]
 enum BackgroundScannerPhase {
     InitialScan,
     EventsReceivedDuringInitialScan,
@@ -4249,8 +4256,6 @@ enum BackgroundScannerPhase {
 
 impl BackgroundScanner {
     async fn run(&mut self, mut fs_events_rx: Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>) {
-        use futures::FutureExt as _;
-
         // If the worktree root does not contain a git repository, then find
         // the git repository in an ancestor directory. Find any gitignore files
         // in ancestor directories.
@@ -4328,13 +4333,14 @@ impl BackgroundScanner {
 
         // Perform an initial scan of the directory.
         drop(scan_job_tx);
-        self.scan_dirs(true, scan_job_rx).await;
+        let scans_running = self.scan_dirs(true, scan_job_rx).await;
         {
             let mut state = self.state.lock();
             state.snapshot.completed_scan_id = state.snapshot.scan_id;
         }
 
-        self.send_status_update(false, SmallVec::new());
+        let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+        self.send_status_update(scanning, SmallVec::new());
 
         // Process any any FS events that occurred while performing the initial scan.
         // For these events, update events cannot be as precise, because we didn't
@@ -4360,7 +4366,8 @@ impl BackgroundScanner {
                 // these before handling changes reported by the filesystem.
                 request = self.next_scan_request().fuse() => {
                     let Ok(request) = request else { break };
-                    if !self.process_scan_request(request, false).await {
+                    let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+                    if !self.process_scan_request(request, scanning).await {
                         return;
                     }
                 }
@@ -4382,7 +4389,8 @@ impl BackgroundScanner {
                             self.process_events(vec![abs_path]).await;
                         }
                     }
-                    self.send_status_update(false, request.done);
+                    let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+                    self.send_status_update(scanning, request.done);
                 }
 
                 paths = fs_events_rx.next().fuse() => {
@@ -4575,24 +4583,36 @@ impl BackgroundScanner {
         .await;
 
         self.update_ignore_statuses(scan_job_tx).await;
-        self.scan_dirs(false, scan_job_rx).await;
+        let scans_running = self.scan_dirs(false, scan_job_rx).await;
 
-        if !dot_git_abs_paths.is_empty() {
-            self.update_git_repositories(dot_git_abs_paths).await;
-        }
-
-        {
-            let mut state = self.state.lock();
-            state.snapshot.completed_scan_id = state.snapshot.scan_id;
-            for (_, entry) in mem::take(&mut state.removed_entries) {
-                state.scanned_dirs.remove(&entry.id);
-            }
-        }
+        let status_update = if !dot_git_abs_paths.is_empty() {
+            Some(self.update_git_repositories(dot_git_abs_paths))
+        } else {
+            None
+        };
 
-        #[cfg(test)]
-        self.state.lock().snapshot.check_git_invariants();
+        let phase = self.phase;
+        let status_update_tx = self.status_updates_tx.clone();
+        let state = self.state.clone();
+        self.executor
+            .spawn(async move {
+                if let Some(status_update) = status_update {
+                    status_update.await;
+                }
 
-        self.send_status_update(false, SmallVec::new());
+                {
+                    let mut state = state.lock();
+                    state.snapshot.completed_scan_id = state.snapshot.scan_id;
+                    for (_, entry) in mem::take(&mut state.removed_entries) {
+                        state.scanned_dirs.remove(&entry.id);
+                    }
+                    #[cfg(test)]
+                    state.snapshot.check_git_invariants();
+                }
+                let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+                send_status_update_inner(phase, state, status_update_tx, scanning, SmallVec::new());
+            })
+            .detach();
     }
 
     async fn forcibly_load_paths(&self, paths: &[Arc<Path>]) -> bool {
@@ -4614,8 +4634,9 @@ impl BackgroundScanner {
             }
             drop(scan_job_tx);
         }
+        let scans_running = Arc::new(AtomicU32::new(0));
         while let Ok(job) = scan_job_rx.recv().await {
-            self.scan_dir(&job).await.log_err();
+            self.scan_dir(&scans_running, &job).await.log_err();
         }
 
         !mem::take(&mut self.state.lock().paths_to_scan).is_empty()
@@ -4625,17 +4646,16 @@ impl BackgroundScanner {
         &self,
         enable_progress_updates: bool,
         scan_jobs_rx: channel::Receiver<ScanJob>,
-    ) {
-        use futures::FutureExt as _;
-
+    ) -> FsScanned {
         if self
             .status_updates_tx
             .unbounded_send(ScanState::Started)
             .is_err()
         {
-            return;
+            return FsScanned::default();
         }
 
+        let scans_running = Arc::new(AtomicU32::new(1));
         let progress_update_count = AtomicUsize::new(0);
         self.executor
             .scoped(|scope| {
@@ -4680,7 +4700,7 @@ impl BackgroundScanner {
                                 // Recursively load directories from the file system.
                                 job = scan_jobs_rx.recv().fuse() => {
                                     let Ok(job) = job else { break };
-                                    if let Err(err) = self.scan_dir(&job).await {
+                                    if let Err(err) = self.scan_dir(&scans_running, &job).await {
                                         if job.path.as_ref() != Path::new("") {
                                             log::error!("error scanning directory {:?}: {}", job.abs_path, err);
                                         }
@@ -4688,34 +4708,28 @@ impl BackgroundScanner {
                                 }
                             }
                         }
-                    })
+                    });
                 }
             })
             .await;
-    }
 
-    fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool {
-        let mut state = self.state.lock();
-        if state.changed_paths.is_empty() && scanning {
-            return true;
+        scans_running.fetch_sub(1, atomic::Ordering::Release);
+        FsScanned {
+            status_scans: scans_running,
         }
+    }
 
-        let new_snapshot = state.snapshot.clone();
-        let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
-        let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths);
-        state.changed_paths.clear();
-
-        self.status_updates_tx
-            .unbounded_send(ScanState::Updated {
-                snapshot: new_snapshot,
-                changes,
-                scanning,
-                barrier,
-            })
-            .is_ok()
+    fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool {
+        send_status_update_inner(
+            self.phase,
+            self.state.clone(),
+            self.status_updates_tx.clone(),
+            scanning,
+            barrier,
+        )
     }
 
-    async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
+    async fn scan_dir(&self, scans_running: &Arc<AtomicU32>, job: &ScanJob) -> Result<()> {
         let root_abs_path;
         let root_char_bag;
         {
@@ -4755,22 +4769,25 @@ impl BackgroundScanner {
         swap_to_front(&mut child_paths, *GITIGNORE);
         swap_to_front(&mut child_paths, *DOT_GIT);
 
+        let mut git_status_update_jobs = Vec::new();
         for child_abs_path in child_paths {
             let child_abs_path: Arc<Path> = child_abs_path.into();
             let child_name = child_abs_path.file_name().unwrap();
             let child_path: Arc<Path> = job.path.join(child_name).into();
 
             if child_name == *DOT_GIT {
-                let repo = self.state.lock().insert_git_repository(
-                    child_path.clone(),
-                    self.fs.as_ref(),
-                    self.watcher.as_ref(),
-                );
-
-                if let Some(local_repo) = repo {
-                    self.update_git_repository(UpdateGitRepoJob {
-                        local_repository: local_repo,
-                    });
+                {
+                    let mut state = self.state.lock();
+                    let repo = state.insert_git_repository(
+                        child_path.clone(),
+                        self.fs.as_ref(),
+                        self.watcher.as_ref(),
+                    );
+                    if let Some(local_repo) = repo {
+                        scans_running.fetch_add(1, atomic::Ordering::Release);
+                        git_status_update_jobs
+                            .push(self.schedule_git_statuses_update(&mut state, local_repo));
+                    }
                 }
             } else if child_name == *GITIGNORE {
                 match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
@@ -4887,6 +4904,32 @@ impl BackgroundScanner {
             new_entries.push(child_entry);
         }
 
+        let task_state = self.state.clone();
+        let phase = self.phase;
+        let status_updates_tx = self.status_updates_tx.clone();
+        let scans_running = scans_running.clone();
+        self.executor
+            .spawn(async move {
+                if !git_status_update_jobs.is_empty() {
+                    let status_updates = join_all(git_status_update_jobs).await;
+                    let status_updated = status_updates
+                        .iter()
+                        .any(|update_result| update_result.is_ok());
+                    scans_running.fetch_sub(status_updates.len() as u32, atomic::Ordering::Release);
+                    if status_updated {
+                        let scanning = scans_running.load(atomic::Ordering::Acquire) > 0;
+                        send_status_update_inner(
+                            phase,
+                            task_state,
+                            status_updates_tx,
+                            scanning,
+                            SmallVec::new(),
+                        );
+                    }
+                }
+            })
+            .detach();
+
         let mut state = self.state.lock();
 
         // Identify any subdirectories that should not be scanned.
@@ -5127,8 +5170,6 @@ impl BackgroundScanner {
     }
 
     async fn update_ignore_statuses(&self, scan_job_tx: Sender<ScanJob>) {
-        use futures::FutureExt as _;
-
         let mut ignores_to_update = Vec::new();
         let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
         let prev_snapshot;
@@ -5278,10 +5319,10 @@ impl BackgroundScanner {
         state.snapshot.entries_by_id.edit(entries_by_id_edits, &());
     }
 
-    async fn update_git_repositories(&self, dot_git_paths: Vec<PathBuf>) {
+    fn update_git_repositories(&self, dot_git_paths: Vec<PathBuf>) -> Task<()> {
         log::debug!("reloading repositories: {dot_git_paths:?}");
 
-        let mut repo_updates = Vec::new();
+        let mut status_updates = Vec::new();
         {
             let mut state = self.state.lock();
             let scan_id = state.snapshot.scan_id;
@@ -5305,7 +5346,7 @@ impl BackgroundScanner {
                     None => {
                         let Ok(relative) = dot_git_dir.strip_prefix(state.snapshot.abs_path())
                         else {
-                            return;
+                            return Task::ready(());
                         };
                         match state.insert_git_repository(
                             relative.into(),
@@ -5334,7 +5375,8 @@ impl BackgroundScanner {
                     }
                 };
 
-                repo_updates.push(UpdateGitRepoJob { local_repository });
+                status_updates
+                    .push(self.schedule_git_statuses_update(&mut state, local_repository));
             }
 
             // Remove any git repositories whose .git entry no longer exists.
@@ -5365,257 +5407,109 @@ impl BackgroundScanner {
             });
         }
 
-        let (mut updates_done_tx, mut updates_done_rx) = barrier::channel();
-        self.executor
-            .scoped(|scope| {
-                scope.spawn(async {
-                    for repo_update in repo_updates {
-                        self.update_git_repository(repo_update);
-                    }
-                    updates_done_tx.blocking_send(()).ok();
-                });
-
-                scope.spawn(async {
-                    loop {
-                        select_biased! {
-                            // Process any path refresh requests before moving on to process
-                            // the queue of git statuses.
-                            request = self.next_scan_request().fuse() => {
-                                let Ok(request) = request else { break };
-                                if !self.process_scan_request(request, true).await {
-                                    return;
-                                }
-                            }
-                            _ = updates_done_rx.recv().fuse() =>  break,
-                        }
-                    }
-                });
-            })
-            .await;
-    }
-
-    fn update_branches(&self, job: &UpdateGitRepoJob) -> Result<()> {
-        let branches = job.local_repository.repo().branches()?;
-        let snapshot = self.state.lock().snapshot.snapshot.clone();
-
-        let mut repository = snapshot
-            .repository(job.local_repository.work_directory.path_key())
-            .context("Missing repository")?;
-
-        repository.branch = branches.into_iter().find(|branch| branch.is_head);
-
-        let mut state = self.state.lock();
-        state
-            .snapshot
-            .repositories
-            .insert_or_replace(repository, &());
-
-        Ok(())
+        self.executor.spawn(async move {
+            let _updates_finished: Vec<Result<(), oneshot::Canceled>> =
+                join_all(status_updates).await;
+        })
     }
 
-    fn update_statuses(&self, job: &UpdateGitRepoJob) -> Result<()> {
-        log::trace!(
-            "updating git statuses for repo {:?}",
-            job.local_repository.work_directory.display_name()
-        );
-        let t0 = Instant::now();
-
-        let statuses = job
-            .local_repository
-            .repo()
-            .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])?;
+    /// Update the git statuses for a given batch of entries.
+    fn schedule_git_statuses_update(
+        &self,
+        state: &mut BackgroundScannerState,
+        mut local_repository: LocalRepositoryEntry,
+    ) -> oneshot::Receiver<()> {
+        let repository_name = local_repository.work_directory.display_name();
+        let path_key = local_repository.work_directory.path_key();
 
-        log::trace!(
-            "computed git statuses for repo {:?} in {:?}",
-            job.local_repository.work_directory.display_name(),
-            t0.elapsed()
-        );
+        let job_state = self.state.clone();
+        let (tx, rx) = oneshot::channel();
 
-        let t0 = Instant::now();
-        let mut changed_paths = Vec::new();
-        let snapshot = self.state.lock().snapshot.snapshot.clone();
+        state.repository_scans.insert(
+            path_key.clone(),
+            self.executor.spawn(async move {
+                update_branches(&job_state, &mut local_repository).log_err();
+                log::trace!("updating git statuses for repo {repository_name}",);
+                let t0 = Instant::now();
+
+                let Some(statuses) = local_repository
+                    .repo()
+                    .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
+                    .log_err()
+                else {
+                    return;
+                };
 
-        let mut repository = snapshot
-            .repository(job.local_repository.work_directory.path_key())
-            .context("Got an UpdateGitStatusesJob for a repository that isn't in the snapshot")?;
+                log::trace!(
+                    "computed git statuses for repo {repository_name} in {:?}",
+                    t0.elapsed()
+                );
 
-        let merge_head_shas = job.local_repository.repo().merge_head_shas();
-        if merge_head_shas != job.local_repository.current_merge_head_shas {
-            mem::take(&mut repository.current_merge_conflicts);
-        }
+                let t0 = Instant::now();
+                let mut changed_paths = Vec::new();
+                let snapshot = job_state.lock().snapshot.snapshot.clone();
 
-        let mut new_entries_by_path = SumTree::new(&());
-        for (repo_path, status) in statuses.entries.iter() {
-            let project_path = repository.work_directory.unrelativize(repo_path);
+                let Some(mut repository) = snapshot
+                    .repository(path_key)
+                    .context(
+                        "Tried to update git statuses for a repository that isn't in the snapshot",
+                    )
+                    .log_err()
+                else {
+                    return;
+                };
 
-            new_entries_by_path.insert_or_replace(
-                StatusEntry {
-                    repo_path: repo_path.clone(),
-                    status: *status,
-                },
-                &(),
-            );
-            if status.is_conflicted() {
-                repository.current_merge_conflicts.insert(repo_path.clone());
-            }
+                let merge_head_shas = local_repository.repo().merge_head_shas();
+                if merge_head_shas != local_repository.current_merge_head_shas {
+                    mem::take(&mut repository.current_merge_conflicts);
+                }
 
-            if let Some(path) = project_path {
-                changed_paths.push(path);
-            }
-        }
+                let mut new_entries_by_path = SumTree::new(&());
+                for (repo_path, status) in statuses.entries.iter() {
+                    let project_path = repository.work_directory.unrelativize(repo_path);
 
-        repository.statuses_by_path = new_entries_by_path;
+                    new_entries_by_path.insert_or_replace(
+                        StatusEntry {
+                            repo_path: repo_path.clone(),
+                            status: *status,
+                        },
+                        &(),
+                    );
 
-        let mut state = self.state.lock();
-        state
-            .snapshot
-            .repositories
-            .insert_or_replace(repository, &());
+                    if let Some(path) = project_path {
+                        changed_paths.push(path);
+                    }
+                }
 
-        state
-            .snapshot
-            .git_repositories
-            .update(&job.local_repository.work_directory_id, |entry| {
-                entry.current_merge_head_shas = merge_head_shas;
-            });
+                repository.statuses_by_path = new_entries_by_path;
+                let mut state = job_state.lock();
+                state
+                    .snapshot
+                    .repositories
+                    .insert_or_replace(repository, &());
+                state.snapshot.git_repositories.update(
+                    &local_repository.work_directory_id,
+                    |entry| {
+                        entry.current_merge_head_shas = merge_head_shas;
+                        entry.status_scan_id += 1;
+                    },
+                );
 
-        util::extend_sorted(
-            &mut state.changed_paths,
-            changed_paths,
-            usize::MAX,
-            Ord::cmp,
-        );
+                util::extend_sorted(
+                    &mut state.changed_paths,
+                    changed_paths,
+                    usize::MAX,
+                    Ord::cmp,
+                );
 
-        log::trace!(
-            "applied git status updates for repo {:?} in {:?}",
-            job.local_repository.work_directory.display_name(),
-            t0.elapsed(),
+                log::trace!(
+                    "applied git status updates for repo {repository_name} in {:?}",
+                    t0.elapsed(),
+                );
+                tx.send(()).ok();
+            }),
         );
-        Ok(())
-    }
-
-    /// Update the git statuses for a given batch of entries.
-    fn update_git_repository(&self, job: UpdateGitRepoJob) {
-        self.update_branches(&job).log_err();
-        self.update_statuses(&job).log_err();
-    }
-
-    fn build_change_set(
-        &self,
-        old_snapshot: &Snapshot,
-        new_snapshot: &Snapshot,
-        event_paths: &[Arc<Path>],
-    ) -> UpdatedEntriesSet {
-        use BackgroundScannerPhase::*;
-        use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
-
-        // Identify which paths have changed. Use the known set of changed
-        // parent paths to optimize the search.
-        let mut changes = Vec::new();
-        let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(&());
-        let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(&());
-        let mut last_newly_loaded_dir_path = None;
-        old_paths.next(&());
-        new_paths.next(&());
-        for path in event_paths {
-            let path = PathKey(path.clone());
-            if old_paths.item().map_or(false, |e| e.path < path.0) {
-                old_paths.seek_forward(&path, Bias::Left, &());
-            }
-            if new_paths.item().map_or(false, |e| e.path < path.0) {
-                new_paths.seek_forward(&path, Bias::Left, &());
-            }
-            loop {
-                match (old_paths.item(), new_paths.item()) {
-                    (Some(old_entry), Some(new_entry)) => {
-                        if old_entry.path > path.0
-                            && new_entry.path > path.0
-                            && !old_entry.path.starts_with(&path.0)
-                            && !new_entry.path.starts_with(&path.0)
-                        {
-                            break;
-                        }
-
-                        match Ord::cmp(&old_entry.path, &new_entry.path) {
-                            Ordering::Less => {
-                                changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                                old_paths.next(&());
-                            }
-                            Ordering::Equal => {
-                                if self.phase == EventsReceivedDuringInitialScan {
-                                    if old_entry.id != new_entry.id {
-                                        changes.push((
-                                            old_entry.path.clone(),
-                                            old_entry.id,
-                                            Removed,
-                                        ));
-                                    }
-                                    // If the worktree was not fully initialized when this event was generated,
-                                    // we can't know whether this entry was added during the scan or whether
-                                    // it was merely updated.
-                                    changes.push((
-                                        new_entry.path.clone(),
-                                        new_entry.id,
-                                        AddedOrUpdated,
-                                    ));
-                                } else if old_entry.id != new_entry.id {
-                                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                                    changes.push((new_entry.path.clone(), new_entry.id, Added));
-                                } else if old_entry != new_entry {
-                                    if old_entry.kind.is_unloaded() {
-                                        last_newly_loaded_dir_path = Some(&new_entry.path);
-                                        changes.push((
-                                            new_entry.path.clone(),
-                                            new_entry.id,
-                                            Loaded,
-                                        ));
-                                    } else {
-                                        changes.push((
-                                            new_entry.path.clone(),
-                                            new_entry.id,
-                                            Updated,
-                                        ));
-                                    }
-                                }
-                                old_paths.next(&());
-                                new_paths.next(&());
-                            }
-                            Ordering::Greater => {
-                                let is_newly_loaded = self.phase == InitialScan
-                                    || last_newly_loaded_dir_path
-                                        .as_ref()
-                                        .map_or(false, |dir| new_entry.path.starts_with(dir));
-                                changes.push((
-                                    new_entry.path.clone(),
-                                    new_entry.id,
-                                    if is_newly_loaded { Loaded } else { Added },
-                                ));
-                                new_paths.next(&());
-                            }
-                        }
-                    }
-                    (Some(old_entry), None) => {
-                        changes.push((old_entry.path.clone(), old_entry.id, Removed));
-                        old_paths.next(&());
-                    }
-                    (None, Some(new_entry)) => {
-                        let is_newly_loaded = self.phase == InitialScan
-                            || last_newly_loaded_dir_path
-                                .as_ref()
-                                .map_or(false, |dir| new_entry.path.starts_with(dir));
-                        changes.push((
-                            new_entry.path.clone(),
-                            new_entry.id,
-                            if is_newly_loaded { Loaded } else { Added },
-                        ));
-                        new_paths.next(&());
-                    }
-                    (None, None) => break,
-                }
-            }
-        }
-
-        changes.into()
+        rx
     }
 
     async fn progress_timer(&self, running: bool) {
@@ -5645,6 +5539,159 @@ impl BackgroundScanner {
     }
 }
 
+fn send_status_update_inner(
+    phase: BackgroundScannerPhase,
+    state: Arc<Mutex<BackgroundScannerState>>,
+    status_updates_tx: UnboundedSender<ScanState>,
+    scanning: bool,
+    barrier: SmallVec<[barrier::Sender; 1]>,
+) -> bool {
+    let mut state = state.lock();
+    if state.changed_paths.is_empty() && scanning {
+        return true;
+    }
+
+    let new_snapshot = state.snapshot.clone();
+    let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
+    let changes = build_diff(phase, &old_snapshot, &new_snapshot, &state.changed_paths);
+    state.changed_paths.clear();
+
+    status_updates_tx
+        .unbounded_send(ScanState::Updated {
+            snapshot: new_snapshot,
+            changes,
+            scanning,
+            barrier,
+        })
+        .is_ok()
+}
+
+fn update_branches(
+    state: &Mutex<BackgroundScannerState>,
+    repository: &mut LocalRepositoryEntry,
+) -> Result<()> {
+    let branches = repository.repo().branches()?;
+    let snapshot = state.lock().snapshot.snapshot.clone();
+    let mut repository = snapshot
+        .repository(repository.work_directory.path_key())
+        .context("Missing repository")?;
+    repository.branch = branches.into_iter().find(|branch| branch.is_head);
+
+    let mut state = state.lock();
+    state
+        .snapshot
+        .repositories
+        .insert_or_replace(repository, &());
+
+    Ok(())
+}
+
+fn build_diff(
+    phase: BackgroundScannerPhase,
+    old_snapshot: &Snapshot,
+    new_snapshot: &Snapshot,
+    event_paths: &[Arc<Path>],
+) -> UpdatedEntriesSet {
+    use BackgroundScannerPhase::*;
+    use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
+
+    // Identify which paths have changed. Use the known set of changed
+    // parent paths to optimize the search.
+    let mut changes = Vec::new();
+    let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(&());
+    let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(&());
+    let mut last_newly_loaded_dir_path = None;
+    old_paths.next(&());
+    new_paths.next(&());
+    for path in event_paths {
+        let path = PathKey(path.clone());
+        if old_paths.item().map_or(false, |e| e.path < path.0) {
+            old_paths.seek_forward(&path, Bias::Left, &());
+        }
+        if new_paths.item().map_or(false, |e| e.path < path.0) {
+            new_paths.seek_forward(&path, Bias::Left, &());
+        }
+        loop {
+            match (old_paths.item(), new_paths.item()) {
+                (Some(old_entry), Some(new_entry)) => {
+                    if old_entry.path > path.0
+                        && new_entry.path > path.0
+                        && !old_entry.path.starts_with(&path.0)
+                        && !new_entry.path.starts_with(&path.0)
+                    {
+                        break;
+                    }
+
+                    match Ord::cmp(&old_entry.path, &new_entry.path) {
+                        Ordering::Less => {
+                            changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                            old_paths.next(&());
+                        }
+                        Ordering::Equal => {
+                            if phase == EventsReceivedDuringInitialScan {
+                                if old_entry.id != new_entry.id {
+                                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                                }
+                                // If the worktree was not fully initialized when this event was generated,
+                                // we can't know whether this entry was added during the scan or whether
+                                // it was merely updated.
+                                changes.push((
+                                    new_entry.path.clone(),
+                                    new_entry.id,
+                                    AddedOrUpdated,
+                                ));
+                            } else if old_entry.id != new_entry.id {
+                                changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                                changes.push((new_entry.path.clone(), new_entry.id, Added));
+                            } else if old_entry != new_entry {
+                                if old_entry.kind.is_unloaded() {
+                                    last_newly_loaded_dir_path = Some(&new_entry.path);
+                                    changes.push((new_entry.path.clone(), new_entry.id, Loaded));
+                                } else {
+                                    changes.push((new_entry.path.clone(), new_entry.id, Updated));
+                                }
+                            }
+                            old_paths.next(&());
+                            new_paths.next(&());
+                        }
+                        Ordering::Greater => {
+                            let is_newly_loaded = phase == InitialScan
+                                || last_newly_loaded_dir_path
+                                    .as_ref()
+                                    .map_or(false, |dir| new_entry.path.starts_with(dir));
+                            changes.push((
+                                new_entry.path.clone(),
+                                new_entry.id,
+                                if is_newly_loaded { Loaded } else { Added },
+                            ));
+                            new_paths.next(&());
+                        }
+                    }
+                }
+                (Some(old_entry), None) => {
+                    changes.push((old_entry.path.clone(), old_entry.id, Removed));
+                    old_paths.next(&());
+                }
+                (None, Some(new_entry)) => {
+                    let is_newly_loaded = phase == InitialScan
+                        || last_newly_loaded_dir_path
+                            .as_ref()
+                            .map_or(false, |dir| new_entry.path.starts_with(dir));
+                    changes.push((
+                        new_entry.path.clone(),
+                        new_entry.id,
+                        if is_newly_loaded { Loaded } else { Added },
+                    ));
+                    new_paths.next(&());
+                }
+                (None, None) => break,
+            }
+        }
+    }
+
+    changes.into()
+}
+
 fn swap_to_front(child_paths: &mut Vec<PathBuf>, file: &OsStr) {
     let position = child_paths
         .iter()
@@ -5691,6 +5738,7 @@ impl RepoPaths {
     }
 }
 
+#[derive(Debug)]
 struct ScanJob {
     abs_path: Arc<Path>,
     path: Arc<Path>,
@@ -5707,10 +5755,6 @@ struct UpdateIgnoreStatusJob {
     scan_queue: Sender<ScanJob>,
 }
 
-struct UpdateGitRepoJob {
-    local_repository: LocalRepositoryEntry,
-}
-
 pub trait WorktreeModelHandle {
     #[cfg(any(test, feature = "test-support"))]
     fn flush_fs_events<'a>(

crates/worktree/src/worktree_tests.rs 🔗

@@ -24,6 +24,7 @@ use std::{
     mem,
     path::{Path, PathBuf},
     sync::Arc,
+    time::Duration,
 };
 use util::{test::TempTree, ResultExt};
 
@@ -1504,6 +1505,7 @@ async fn test_bump_mtime_of_git_repo_workdir(cx: &mut TestAppContext) {
         &[(Path::new("b/c.txt"), StatusCode::Modified.index())],
     );
     cx.executor().run_until_parked();
+    cx.executor().advance_clock(Duration::from_secs(1));
 
     let snapshot = tree.read_with(cx, |tree, _| tree.snapshot());