@@ -61,7 +61,7 @@ use std::{
path::{Component, Path, PathBuf},
pin::Pin,
sync::{
- atomic::{self, AtomicU32, AtomicUsize, Ordering::SeqCst},
+ atomic::{self, AtomicI32, AtomicUsize, Ordering::SeqCst},
Arc,
},
time::{Duration, Instant},
@@ -1525,6 +1525,7 @@ impl LocalWorktree {
fs,
fs_case_sensitive,
status_updates_tx: scan_states_tx,
+ scans_running: Arc::new(AtomicI32::new(0)),
executor: background,
scan_requests_rx,
path_prefixes_to_scan_rx,
@@ -4249,11 +4250,6 @@ struct PathEntry {
scan_id: usize,
}
-#[derive(Debug, Default)]
-struct FsScanned {
- status_scans: Arc<AtomicU32>,
-}
-
impl sum_tree::Item for PathEntry {
type Summary = PathEntrySummary;
@@ -4321,6 +4317,7 @@ struct BackgroundScanner {
fs: Arc<dyn Fs>,
fs_case_sensitive: bool,
status_updates_tx: UnboundedSender<ScanState>,
+ scans_running: Arc<AtomicI32>,
executor: BackgroundExecutor,
scan_requests_rx: channel::Receiver<ScanRequest>,
path_prefixes_to_scan_rx: channel::Receiver<PathPrefixScanRequest>,
@@ -4428,13 +4425,13 @@ impl BackgroundScanner {
// Perform an initial scan of the directory.
drop(scan_job_tx);
- let scans_running = self.scan_dirs(true, scan_job_rx).await;
+ self.scan_dirs(true, scan_job_rx).await;
{
let mut state = self.state.lock();
state.snapshot.completed_scan_id = state.snapshot.scan_id;
}
- let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+ let scanning = self.scans_running.load(atomic::Ordering::Acquire) > 0;
self.send_status_update(scanning, SmallVec::new());
// Process any any FS events that occurred while performing the initial scan.
@@ -4461,7 +4458,7 @@ impl BackgroundScanner {
// these before handling changes reported by the filesystem.
request = self.next_scan_request().fuse() => {
let Ok(request) = request else { break };
- let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+ let scanning = self.scans_running.load(atomic::Ordering::Acquire) > 0;
if !self.process_scan_request(request, scanning).await {
return;
}
@@ -4484,7 +4481,7 @@ impl BackgroundScanner {
self.process_events(vec![abs_path]).await;
}
}
- let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+ let scanning = self.scans_running.load(atomic::Ordering::Acquire) > 0;
self.send_status_update(scanning, request.done);
}
@@ -4678,7 +4675,7 @@ impl BackgroundScanner {
.await;
self.update_ignore_statuses(scan_job_tx).await;
- let scans_running = self.scan_dirs(false, scan_job_rx).await;
+ self.scan_dirs(false, scan_job_rx).await;
let status_update = if !dot_git_abs_paths.is_empty() {
Some(self.update_git_repositories(dot_git_abs_paths))
@@ -4689,6 +4686,7 @@ impl BackgroundScanner {
let phase = self.phase;
let status_update_tx = self.status_updates_tx.clone();
let state = self.state.clone();
+ let scans_running = self.scans_running.clone();
self.executor
.spawn(async move {
if let Some(status_update) = status_update {
@@ -4704,7 +4702,7 @@ impl BackgroundScanner {
#[cfg(test)]
state.snapshot.check_git_invariants();
}
- let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0;
+ let scanning = scans_running.load(atomic::Ordering::Acquire) > 0;
send_status_update_inner(phase, state, status_update_tx, scanning, SmallVec::new());
})
.detach();
@@ -4729,9 +4727,8 @@ impl BackgroundScanner {
}
drop(scan_job_tx);
}
- let scans_running = Arc::new(AtomicU32::new(0));
while let Ok(job) = scan_job_rx.recv().await {
- self.scan_dir(&scans_running, &job).await.log_err();
+ self.scan_dir(&job).await.log_err();
}
!mem::take(&mut self.state.lock().paths_to_scan).is_empty()
@@ -4741,16 +4738,16 @@ impl BackgroundScanner {
&self,
enable_progress_updates: bool,
scan_jobs_rx: channel::Receiver<ScanJob>,
- ) -> FsScanned {
+ ) {
if self
.status_updates_tx
.unbounded_send(ScanState::Started)
.is_err()
{
- return FsScanned::default();
+ return;
}
- let scans_running = Arc::new(AtomicU32::new(1));
+ inc_scans_running(&self.scans_running);
let progress_update_count = AtomicUsize::new(0);
self.executor
.scoped(|scope| {
@@ -4795,7 +4792,7 @@ impl BackgroundScanner {
// Recursively load directories from the file system.
job = scan_jobs_rx.recv().fuse() => {
let Ok(job) = job else { break };
- if let Err(err) = self.scan_dir(&scans_running, &job).await {
+ if let Err(err) = self.scan_dir(&job).await {
if job.path.as_ref() != Path::new("") {
log::error!("error scanning directory {:?}: {}", job.abs_path, err);
}
@@ -4808,10 +4805,7 @@ impl BackgroundScanner {
})
.await;
- scans_running.fetch_sub(1, atomic::Ordering::Release);
- FsScanned {
- status_scans: scans_running,
- }
+ dec_scans_running(&self.scans_running, 1);
}
fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool {
@@ -4824,7 +4818,7 @@ impl BackgroundScanner {
)
}
- async fn scan_dir(&self, scans_running: &Arc<AtomicU32>, job: &ScanJob) -> Result<()> {
+ async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
let root_abs_path;
let root_char_bag;
{
@@ -4879,7 +4873,7 @@ impl BackgroundScanner {
self.watcher.as_ref(),
);
if let Some(local_repo) = repo {
- scans_running.fetch_add(1, atomic::Ordering::Release);
+ inc_scans_running(&self.scans_running);
git_status_update_jobs
.push(self.schedule_git_statuses_update(&mut state, local_repo));
}
@@ -5002,7 +4996,7 @@ impl BackgroundScanner {
let task_state = self.state.clone();
let phase = self.phase;
let status_updates_tx = self.status_updates_tx.clone();
- let scans_running = scans_running.clone();
+ let scans_running = self.scans_running.clone();
self.executor
.spawn(async move {
if !git_status_update_jobs.is_empty() {
@@ -5010,7 +5004,7 @@ impl BackgroundScanner {
let status_updated = status_updates
.iter()
.any(|update_result| update_result.is_ok());
- scans_running.fetch_sub(status_updates.len() as u32, atomic::Ordering::Release);
+ dec_scans_running(&scans_running, status_updates.len() as i32);
if status_updated {
let scanning = scans_running.load(atomic::Ordering::Acquire) > 0;
send_status_update_inner(
@@ -5512,106 +5506,15 @@ impl BackgroundScanner {
fn schedule_git_statuses_update(
&self,
state: &mut BackgroundScannerState,
- mut local_repository: LocalRepositoryEntry,
+ local_repository: LocalRepositoryEntry,
) -> oneshot::Receiver<()> {
- let repository_name = local_repository.work_directory.display_name();
- let path_key = local_repository.work_directory.path_key();
-
let job_state = self.state.clone();
let (tx, rx) = oneshot::channel();
state.repository_scans.insert(
- path_key.clone(),
- self.executor.spawn(async move {
- update_branches(&job_state, &mut local_repository)
- .await
- .log_err();
- log::trace!("updating git statuses for repo {repository_name}",);
- let t0 = Instant::now();
-
- let Some(statuses) = local_repository
- .repo()
- .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
- .log_err()
- else {
- return;
- };
- log::trace!(
- "computed git statuses for repo {repository_name} in {:?}",
- t0.elapsed()
- );
-
- let t0 = Instant::now();
- let mut changed_paths = Vec::new();
- let snapshot = job_state.lock().snapshot.snapshot.clone();
-
- let Some(mut repository) = snapshot
- .repository(path_key)
- .context(
- "Tried to update git statuses for a repository that isn't in the snapshot",
- )
- .log_err()
- else {
- return;
- };
-
- let merge_head_shas = local_repository.repo().merge_head_shas();
- if merge_head_shas != local_repository.current_merge_head_shas {
- mem::take(&mut repository.current_merge_conflicts);
- }
-
- let mut new_entries_by_path = SumTree::new(&());
- for (repo_path, status) in statuses.entries.iter() {
- let project_path = repository.work_directory.try_unrelativize(repo_path);
-
- new_entries_by_path.insert_or_replace(
- StatusEntry {
- repo_path: repo_path.clone(),
- status: *status,
- },
- &(),
- );
- if status.is_conflicted() {
- repository.current_merge_conflicts.insert(repo_path.clone());
- }
-
- if let Some(path) = project_path {
- changed_paths.push(path);
- }
- }
-
- repository.statuses_by_path = new_entries_by_path;
- let mut state = job_state.lock();
- state
- .snapshot
- .repositories
- .insert_or_replace(repository, &());
- state.snapshot.git_repositories.update(
- &local_repository.work_directory_id,
- |entry| {
- entry.current_merge_head_shas = merge_head_shas;
- entry.merge_message = std::fs::read_to_string(
- local_repository.dot_git_dir_abs_path.join("MERGE_MSG"),
- )
- .ok()
- .and_then(|merge_msg| Some(merge_msg.lines().next()?.to_owned()));
- entry.status_scan_id += 1;
- },
- );
-
- util::extend_sorted(
- &mut state.changed_paths,
- changed_paths,
- usize::MAX,
- Ord::cmp,
- );
-
- log::trace!(
- "applied git status updates for repo {repository_name} in {:?}",
- t0.elapsed(),
- );
- tx.send(()).ok();
- }),
+ local_repository.work_directory.path_key(),
+ self.executor
+ .spawn(do_git_status_update(job_state, local_repository, tx)),
);
rx
}
@@ -5643,6 +5546,15 @@ impl BackgroundScanner {
}
}
+fn inc_scans_running(scans_running: &AtomicI32) {
+ scans_running.fetch_add(1, atomic::Ordering::Release);
+}
+
+fn dec_scans_running(scans_running: &AtomicI32, by: i32) {
+ let old = scans_running.fetch_sub(by, atomic::Ordering::Release);
+ debug_assert!(old >= by);
+}
+
fn send_status_update_inner(
phase: BackgroundScannerPhase,
state: Arc<Mutex<BackgroundScannerState>>,
@@ -5690,6 +5602,100 @@ async fn update_branches(
Ok(())
}
+async fn do_git_status_update(
+ job_state: Arc<Mutex<BackgroundScannerState>>,
+ mut local_repository: LocalRepositoryEntry,
+ tx: oneshot::Sender<()>,
+) {
+ let repository_name = local_repository.work_directory.display_name();
+ log::trace!("updating git branches for repo {repository_name}");
+ update_branches(&job_state, &mut local_repository)
+ .await
+ .log_err();
+ let t0 = Instant::now();
+
+ log::trace!("updating git statuses for repo {repository_name}");
+ let Some(statuses) = local_repository
+ .repo()
+ .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])
+ .log_err()
+ else {
+ return;
+ };
+ log::trace!(
+ "computed git statuses for repo {repository_name} in {:?}",
+ t0.elapsed()
+ );
+
+ let t0 = Instant::now();
+ let mut changed_paths = Vec::new();
+ let snapshot = job_state.lock().snapshot.snapshot.clone();
+
+ let Some(mut repository) = snapshot
+ .repository(local_repository.work_directory.path_key())
+ .context("Tried to update git statuses for a repository that isn't in the snapshot")
+ .log_err()
+ else {
+ return;
+ };
+
+ let merge_head_shas = local_repository.repo().merge_head_shas();
+ if merge_head_shas != local_repository.current_merge_head_shas {
+ mem::take(&mut repository.current_merge_conflicts);
+ }
+
+ let mut new_entries_by_path = SumTree::new(&());
+ for (repo_path, status) in statuses.entries.iter() {
+ let project_path = repository.work_directory.try_unrelativize(repo_path);
+
+ new_entries_by_path.insert_or_replace(
+ StatusEntry {
+ repo_path: repo_path.clone(),
+ status: *status,
+ },
+ &(),
+ );
+ if status.is_conflicted() {
+ repository.current_merge_conflicts.insert(repo_path.clone());
+ }
+
+ if let Some(path) = project_path {
+ changed_paths.push(path);
+ }
+ }
+
+ repository.statuses_by_path = new_entries_by_path;
+ let mut state = job_state.lock();
+ state
+ .snapshot
+ .repositories
+ .insert_or_replace(repository, &());
+ state
+ .snapshot
+ .git_repositories
+ .update(&local_repository.work_directory_id, |entry| {
+ entry.current_merge_head_shas = merge_head_shas;
+ entry.merge_message =
+ std::fs::read_to_string(local_repository.dot_git_dir_abs_path.join("MERGE_MSG"))
+ .ok()
+ .and_then(|merge_msg| Some(merge_msg.lines().next()?.to_owned()));
+ entry.status_scan_id += 1;
+ });
+
+ util::extend_sorted(
+ &mut state.changed_paths,
+ changed_paths,
+ usize::MAX,
+ Ord::cmp,
+ );
+
+ log::trace!(
+ "applied git status updates for repo {repository_name} in {:?}",
+ t0.elapsed(),
+ );
+ tx.send(()).ok();
+}
+
fn build_diff(
phase: BackgroundScannerPhase,
old_snapshot: &Snapshot,