diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index 915e4e9954a723361800de27e5faaef05b0c0e1c..c2e0a1551e717c9ff9ed7a7b515334d3a9acaea8 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -13,7 +13,6 @@ use futures::{ mpsc::{self, UnboundedSender}, oneshot, }, - future::join_all, select_biased, task::Poll, FutureExt as _, Stream, StreamExt, @@ -466,7 +465,6 @@ struct BackgroundScannerState { changed_paths: Vec>, prev_snapshot: Snapshot, git_hosting_provider_registry: Option>, - repository_scans: HashMap, Task<()>>, } #[derive(Debug, Clone)] @@ -1355,7 +1353,7 @@ impl LocalWorktree { scan_requests_rx, path_prefixes_to_scan_rx, next_entry_id, - state: Arc::new(Mutex::new(BackgroundScannerState { + state: Mutex::new(BackgroundScannerState { prev_snapshot: snapshot.snapshot.clone(), snapshot, scanned_dirs: Default::default(), @@ -1363,9 +1361,8 @@ impl LocalWorktree { paths_to_scan: Default::default(), removed_entries: Default::default(), changed_paths: Default::default(), - repository_scans: HashMap::default(), git_hosting_provider_registry, - })), + }), phase: BackgroundScannerPhase::InitialScan, share_private_files, settings, @@ -4111,7 +4108,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { } struct BackgroundScanner { - state: Arc>, + state: Mutex, fs: Arc, fs_case_sensitive: bool, status_updates_tx: UnboundedSender, @@ -4125,7 +4122,7 @@ struct BackgroundScanner { share_private_files: bool, } -#[derive(Copy, Clone, PartialEq)] +#[derive(PartialEq)] enum BackgroundScannerPhase { InitialScan, EventsReceivedDuringInitialScan, @@ -4134,6 +4131,8 @@ enum BackgroundScannerPhase { impl BackgroundScanner { async fn run(&mut self, mut fs_events_rx: Pin>>>) { + use futures::FutureExt as _; + // If the worktree root does not contain a git repository, then find // the git repository in an ancestor directory. Find any gitignore files // in ancestor directories. @@ -4444,33 +4443,22 @@ impl BackgroundScanner { self.update_ignore_statuses(scan_job_tx).await; self.scan_dirs(false, scan_job_rx).await; - let status_update = if !dot_git_abs_paths.is_empty() { - Some(self.schedule_git_repositories_update(dot_git_abs_paths)) - } else { - None - }; + if !dot_git_abs_paths.is_empty() { + self.update_git_repositories(dot_git_abs_paths).await; + } - let phase = self.phase; - let status_update_tx = self.status_updates_tx.clone(); - let state = self.state.clone(); - self.executor - .spawn(async move { - if let Some(status_update) = status_update { - status_update.await; - } + { + let mut state = self.state.lock(); + state.snapshot.completed_scan_id = state.snapshot.scan_id; + for (_, entry) in mem::take(&mut state.removed_entries) { + state.scanned_dirs.remove(&entry.id); + } + } - { - let mut state = state.lock(); - state.snapshot.completed_scan_id = state.snapshot.scan_id; - for (_, entry) in mem::take(&mut state.removed_entries) { - state.scanned_dirs.remove(&entry.id); - } - #[cfg(test)] - state.snapshot.check_git_invariants(); - } - send_status_update_inner(phase, state, status_update_tx, false, SmallVec::new()); - }) - .detach(); + #[cfg(test)] + self.state.lock().snapshot.check_git_invariants(); + + self.send_status_update(false, SmallVec::new()); } async fn forcibly_load_paths(&self, paths: &[Arc]) -> bool { @@ -4504,6 +4492,8 @@ impl BackgroundScanner { enable_progress_updates: bool, scan_jobs_rx: channel::Receiver, ) { + use futures::FutureExt as _; + if self .status_updates_tx .unbounded_send(ScanState::Started) @@ -4571,13 +4561,24 @@ impl BackgroundScanner { } fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool { - send_status_update_inner( - self.phase, - self.state.clone(), - self.status_updates_tx.clone(), - scanning, - barrier, - ) + let mut state = self.state.lock(); + if state.changed_paths.is_empty() && scanning { + return true; + } + + let new_snapshot = state.snapshot.clone(); + let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); + let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths); + state.changed_paths.clear(); + + self.status_updates_tx + .unbounded_send(ScanState::Updated { + snapshot: new_snapshot, + changes, + scanning, + barrier, + }) + .is_ok() } async fn scan_dir(&self, job: &ScanJob) -> Result<()> { @@ -4633,7 +4634,9 @@ impl BackgroundScanner { ); if let Some(local_repo) = repo { - let _ = self.schedule_git_statuses_update(local_repo); + self.update_git_statuses(UpdateGitStatusesJob { + local_repository: local_repo, + }); } } else if child_name == *GITIGNORE { match build_gitignore(&child_abs_path, self.fs.as_ref()).await { @@ -4990,6 +4993,8 @@ impl BackgroundScanner { } async fn update_ignore_statuses(&self, scan_job_tx: Sender) { + use futures::FutureExt as _; + let mut ignores_to_update = Vec::new(); let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); let prev_snapshot; @@ -5139,10 +5144,10 @@ impl BackgroundScanner { state.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - fn schedule_git_repositories_update(&self, dot_git_paths: Vec) -> Task<()> { + async fn update_git_repositories(&self, dot_git_paths: Vec) { log::debug!("reloading repositories: {dot_git_paths:?}"); - let mut repos_to_update = Vec::new(); + let mut repo_updates = Vec::new(); { let mut state = self.state.lock(); let scan_id = state.snapshot.scan_id; @@ -5205,7 +5210,7 @@ impl BackgroundScanner { } }; - repos_to_update.push(local_repository); + repo_updates.push(UpdateGitStatusesJob { local_repository }); } // Remove any git repositories whose .git entry no longer exists. @@ -5236,109 +5241,238 @@ impl BackgroundScanner { }); } - let mut status_updates = Vec::new(); - for local_repository in repos_to_update { - status_updates.push(self.schedule_git_statuses_update(local_repository)); - } - self.executor.spawn(async move { - let _updates_finished: Vec> = - join_all(status_updates).await; - }) + let (mut updates_done_tx, mut updates_done_rx) = barrier::channel(); + self.executor + .scoped(|scope| { + scope.spawn(async { + for repo_update in repo_updates { + self.update_git_statuses(repo_update); + } + updates_done_tx.blocking_send(()).ok(); + }); + + scope.spawn(async { + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the queue of git statuses. + request = self.next_scan_request().fuse() => { + let Ok(request) = request else { break }; + if !self.process_scan_request(request, true).await { + return; + } + } + _ = updates_done_rx.recv().fuse() => break, + } + } + }); + }) + .await; } /// Update the git statuses for a given batch of entries. - fn schedule_git_statuses_update( - &self, - local_repository: LocalRepositoryEntry, - ) -> oneshot::Receiver<()> { - let repository_path = local_repository.work_directory.path.clone(); - let state = self.state.clone(); - let (tx, rx) = oneshot::channel(); + fn update_git_statuses(&self, job: UpdateGitStatusesJob) { + log::trace!( + "updating git statuses for repo {:?}", + job.local_repository.work_directory.path + ); + let t0 = Instant::now(); - self.state.lock().repository_scans.insert( - repository_path.clone(), - self.executor.spawn(async move { - log::trace!("updating git statuses for repo {repository_path:?}",); - let t0 = Instant::now(); - - let Some(statuses) = local_repository - .repo() - .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) - .log_err() - else { - return; - }; - log::trace!( - "computed git statuses for repo {:?} in {:?}", - repository_path, - t0.elapsed() - ); + let Some(statuses) = job + .local_repository + .repo() + .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) + .log_err() + else { + return; + }; + log::trace!( + "computed git statuses for repo {:?} in {:?}", + job.local_repository.work_directory.path, + t0.elapsed() + ); - let t0 = Instant::now(); - let mut changed_paths = Vec::new(); - let snapshot = state.lock().snapshot.snapshot.clone(); + let t0 = Instant::now(); + let mut changed_paths = Vec::new(); + let snapshot = self.state.lock().snapshot.snapshot.clone(); + + let Some(mut repository) = + snapshot.repository(job.local_repository.work_directory.path_key()) + else { + log::error!("Got an UpdateGitStatusesJob for a repository that isn't in the snapshot"); + debug_assert!(false); + return; + }; - let Some(mut repository) = - snapshot.repository(local_repository.work_directory.path_key()) - else { - log::error!( - "Tried to update git statuses for a repository that isn't in the snapshot" - ); - debug_assert!(false); - return; - }; + let merge_head_shas = job.local_repository.repo().merge_head_shas(); + if merge_head_shas != job.local_repository.current_merge_head_shas { + mem::take(&mut repository.current_merge_conflicts); + } - let merge_head_shas = local_repository.repo().merge_head_shas(); - if merge_head_shas != local_repository.current_merge_head_shas { - mem::take(&mut repository.current_merge_conflicts); - } + let mut new_entries_by_path = SumTree::new(&()); + for (repo_path, status) in statuses.entries.iter() { + let project_path = repository.work_directory.unrelativize(repo_path); - let mut new_entries_by_path = SumTree::new(&()); - for (repo_path, status) in statuses.entries.iter() { - let project_path = repository.work_directory.unrelativize(repo_path); + new_entries_by_path.insert_or_replace( + StatusEntry { + repo_path: repo_path.clone(), + status: *status, + }, + &(), + ); + if status.is_conflicted() { + repository.current_merge_conflicts.insert(repo_path.clone()); + } - new_entries_by_path.insert_or_replace( - StatusEntry { - repo_path: repo_path.clone(), - status: *status, - }, - &(), - ); + if let Some(path) = project_path { + changed_paths.push(path); + } + } - if let Some(path) = project_path { - changed_paths.push(path); - } - } + repository.statuses_by_path = new_entries_by_path; + let mut state = self.state.lock(); + state + .snapshot + .repositories + .insert_or_replace(repository, &()); - repository.statuses_by_path = new_entries_by_path; - let mut state = state.lock(); - state - .snapshot - .repositories - .insert_or_replace(repository, &()); - state.snapshot.git_repositories.update( - &local_repository.work_directory_id, - |entry| { - entry.current_merge_head_shas = merge_head_shas; - }, - ); + state + .snapshot + .git_repositories + .update(&job.local_repository.work_directory_id, |entry| { + entry.current_merge_head_shas = merge_head_shas; + }); - util::extend_sorted( - &mut state.changed_paths, - changed_paths, - usize::MAX, - Ord::cmp, - ); + util::extend_sorted( + &mut state.changed_paths, + changed_paths, + usize::MAX, + Ord::cmp, + ); - log::trace!( - "applied git status updates for repo {:?} in {:?}", - repository_path, - t0.elapsed(), - ); - tx.send(()).ok(); - }), + log::trace!( + "applied git status updates for repo {:?} in {:?}", + job.local_repository.work_directory.path, + t0.elapsed(), ); - rx + } + + fn build_change_set( + &self, + old_snapshot: &Snapshot, + new_snapshot: &Snapshot, + event_paths: &[Arc], + ) -> UpdatedEntriesSet { + use BackgroundScannerPhase::*; + use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; + + // Identify which paths have changed. Use the known set of changed + // parent paths to optimize the search. + let mut changes = Vec::new(); + let mut old_paths = old_snapshot.entries_by_path.cursor::(&()); + let mut new_paths = new_snapshot.entries_by_path.cursor::(&()); + let mut last_newly_loaded_dir_path = None; + old_paths.next(&()); + new_paths.next(&()); + for path in event_paths { + let path = PathKey(path.clone()); + if old_paths.item().map_or(false, |e| e.path < path.0) { + old_paths.seek_forward(&path, Bias::Left, &()); + } + if new_paths.item().map_or(false, |e| e.path < path.0) { + new_paths.seek_forward(&path, Bias::Left, &()); + } + loop { + match (old_paths.item(), new_paths.item()) { + (Some(old_entry), Some(new_entry)) => { + if old_entry.path > path.0 + && new_entry.path > path.0 + && !old_entry.path.starts_with(&path.0) + && !new_entry.path.starts_with(&path.0) + { + break; + } + + match Ord::cmp(&old_entry.path, &new_entry.path) { + Ordering::Less => { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + old_paths.next(&()); + } + Ordering::Equal => { + if self.phase == EventsReceivedDuringInitialScan { + if old_entry.id != new_entry.id { + changes.push(( + old_entry.path.clone(), + old_entry.id, + Removed, + )); + } + // If the worktree was not fully initialized when this event was generated, + // we can't know whether this entry was added during the scan or whether + // it was merely updated. + changes.push(( + new_entry.path.clone(), + new_entry.id, + AddedOrUpdated, + )); + } else if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + changes.push((new_entry.path.clone(), new_entry.id, Added)); + } else if old_entry != new_entry { + if old_entry.kind.is_unloaded() { + last_newly_loaded_dir_path = Some(&new_entry.path); + changes.push(( + new_entry.path.clone(), + new_entry.id, + Loaded, + )); + } else { + changes.push(( + new_entry.path.clone(), + new_entry.id, + Updated, + )); + } + } + old_paths.next(&()); + new_paths.next(&()); + } + Ordering::Greater => { + let is_newly_loaded = self.phase == InitialScan + || last_newly_loaded_dir_path + .as_ref() + .map_or(false, |dir| new_entry.path.starts_with(dir)); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if is_newly_loaded { Loaded } else { Added }, + )); + new_paths.next(&()); + } + } + } + (Some(old_entry), None) => { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + old_paths.next(&()); + } + (None, Some(new_entry)) => { + let is_newly_loaded = self.phase == InitialScan + || last_newly_loaded_dir_path + .as_ref() + .map_or(false, |dir| new_entry.path.starts_with(dir)); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if is_newly_loaded { Loaded } else { Added }, + )); + new_paths.next(&()); + } + (None, None) => break, + } + } + } + + changes.into() } async fn progress_timer(&self, running: bool) { @@ -5368,139 +5502,6 @@ impl BackgroundScanner { } } -fn send_status_update_inner( - phase: BackgroundScannerPhase, - state: Arc>, - status_updates_tx: UnboundedSender, - scanning: bool, - barrier: SmallVec<[barrier::Sender; 1]>, -) -> bool { - let mut state = state.lock(); - if state.changed_paths.is_empty() && scanning { - return true; - } - - let new_snapshot = state.snapshot.clone(); - let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); - let changes = build_diff(phase, &old_snapshot, &new_snapshot, &state.changed_paths); - state.changed_paths.clear(); - - status_updates_tx - .unbounded_send(ScanState::Updated { - snapshot: new_snapshot, - changes, - scanning, - barrier, - }) - .is_ok() -} - -fn build_diff( - phase: BackgroundScannerPhase, - old_snapshot: &Snapshot, - new_snapshot: &Snapshot, - event_paths: &[Arc], -) -> UpdatedEntriesSet { - use BackgroundScannerPhase::*; - use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; - - // Identify which paths have changed. Use the known set of changed - // parent paths to optimize the search. - let mut changes = Vec::new(); - let mut old_paths = old_snapshot.entries_by_path.cursor::(&()); - let mut new_paths = new_snapshot.entries_by_path.cursor::(&()); - let mut last_newly_loaded_dir_path = None; - old_paths.next(&()); - new_paths.next(&()); - for path in event_paths { - let path = PathKey(path.clone()); - if old_paths.item().map_or(false, |e| e.path < path.0) { - old_paths.seek_forward(&path, Bias::Left, &()); - } - if new_paths.item().map_or(false, |e| e.path < path.0) { - new_paths.seek_forward(&path, Bias::Left, &()); - } - loop { - match (old_paths.item(), new_paths.item()) { - (Some(old_entry), Some(new_entry)) => { - if old_entry.path > path.0 - && new_entry.path > path.0 - && !old_entry.path.starts_with(&path.0) - && !new_entry.path.starts_with(&path.0) - { - break; - } - - match Ord::cmp(&old_entry.path, &new_entry.path) { - Ordering::Less => { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - old_paths.next(&()); - } - Ordering::Equal => { - if phase == EventsReceivedDuringInitialScan { - if old_entry.id != new_entry.id { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - } - // If the worktree was not fully initialized when this event was generated, - // we can't know whether this entry was added during the scan or whether - // it was merely updated. - changes.push(( - new_entry.path.clone(), - new_entry.id, - AddedOrUpdated, - )); - } else if old_entry.id != new_entry.id { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - changes.push((new_entry.path.clone(), new_entry.id, Added)); - } else if old_entry != new_entry { - if old_entry.kind.is_unloaded() { - last_newly_loaded_dir_path = Some(&new_entry.path); - changes.push((new_entry.path.clone(), new_entry.id, Loaded)); - } else { - changes.push((new_entry.path.clone(), new_entry.id, Updated)); - } - } - old_paths.next(&()); - new_paths.next(&()); - } - Ordering::Greater => { - let is_newly_loaded = phase == InitialScan - || last_newly_loaded_dir_path - .as_ref() - .map_or(false, |dir| new_entry.path.starts_with(dir)); - changes.push(( - new_entry.path.clone(), - new_entry.id, - if is_newly_loaded { Loaded } else { Added }, - )); - new_paths.next(&()); - } - } - } - (Some(old_entry), None) => { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - old_paths.next(&()); - } - (None, Some(new_entry)) => { - let is_newly_loaded = phase == InitialScan - || last_newly_loaded_dir_path - .as_ref() - .map_or(false, |dir| new_entry.path.starts_with(dir)); - changes.push(( - new_entry.path.clone(), - new_entry.id, - if is_newly_loaded { Loaded } else { Added }, - )); - new_paths.next(&()); - } - (None, None) => break, - } - } - } - - changes.into() -} - fn swap_to_front(child_paths: &mut Vec, file: &OsStr) { let position = child_paths .iter() @@ -5563,6 +5564,10 @@ struct UpdateIgnoreStatusJob { scan_queue: Sender, } +struct UpdateGitStatusesJob { + local_repository: LocalRepositoryEntry, +} + pub trait WorktreeModelHandle { #[cfg(any(test, feature = "test-support"))] fn flush_fs_events<'a>( diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index 34e1f0063e102b751a85f3e86ece2eaac2bd9d3b..2cee728aec89e40500700c182ed617400085739e 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -24,7 +24,6 @@ use std::{ mem, path::{Path, PathBuf}, sync::Arc, - time::Duration, }; use util::{test::TempTree, ResultExt}; @@ -1505,7 +1504,6 @@ async fn test_bump_mtime_of_git_repo_workdir(cx: &mut TestAppContext) { &[(Path::new("b/c.txt"), StatusCode::Modified.index())], ); cx.executor().run_until_parked(); - cx.executor().advance_clock(Duration::from_secs(1)); let snapshot = tree.read_with(cx, |tree, _| tree.snapshot());