Handle path changes and progress updates from all worker threads during initial scan

Max Brunsfeld created

Change summary

crates/project/src/worktree.rs | 131 +++++++++++++++++++----------------
1 file changed, 70 insertions(+), 61 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -9,7 +9,7 @@ use collections::{HashMap, VecDeque};
 use fs::{repository::GitRepository, Fs, LineEnding};
 use futures::{
     channel::{
-        mpsc::{self, UnboundedReceiver, UnboundedSender},
+        mpsc::{self, UnboundedSender},
         oneshot,
     },
     select_biased, Stream, StreamExt,
@@ -44,7 +44,10 @@ use std::{
     mem,
     ops::{Deref, DerefMut},
     path::{Path, PathBuf},
-    sync::{atomic::AtomicUsize, Arc},
+    sync::{
+        atomic::{AtomicUsize, Ordering::SeqCst},
+        Arc,
+    },
     task::Poll,
     time::{Duration, SystemTime},
 };
@@ -61,7 +64,7 @@ pub enum Worktree {
 
 pub struct LocalWorktree {
     snapshot: LocalSnapshot,
-    path_changes_tx: mpsc::UnboundedSender<(Vec<PathBuf>, barrier::Sender)>,
+    path_changes_tx: channel::Sender<(Vec<PathBuf>, barrier::Sender)>,
     is_scanning: (watch::Sender<bool>, watch::Receiver<bool>),
     _background_scanner_task: Task<()>,
     share: Option<ShareState>,
@@ -238,7 +241,7 @@ impl Worktree {
                 );
             }
 
-            let (path_changes_tx, path_changes_rx) = mpsc::unbounded();
+            let (path_changes_tx, path_changes_rx) = channel::unbounded();
             let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
 
             cx.spawn_weak(|this, mut cx| async move {
@@ -837,7 +840,7 @@ impl LocalWorktree {
                 this.as_local_mut()
                     .unwrap()
                     .path_changes_tx
-                    .unbounded_send((vec![abs_path], tx))
+                    .try_send((vec![abs_path], tx))
                     .unwrap();
             });
             rx.recv().await;
@@ -930,7 +933,7 @@ impl LocalWorktree {
             }
 
             let (tx, mut rx) = barrier::channel();
-            path_changes_tx.unbounded_send((paths, tx)).unwrap();
+            path_changes_tx.try_send((paths, tx)).unwrap();
             rx.recv().await;
             this.upgrade(&cx)
                 .ok_or_else(|| anyhow!("worktree was dropped"))?
@@ -2165,7 +2168,7 @@ impl BackgroundScanner {
     async fn run(
         self,
         events_rx: impl Stream<Item = Vec<fsevent::Event>>,
-        mut changed_paths: UnboundedReceiver<(Vec<PathBuf>, barrier::Sender)>,
+        mut changed_paths: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
     ) {
         use futures::FutureExt as _;
 
@@ -2225,64 +2228,70 @@ impl BackgroundScanner {
                 .unwrap();
             drop(tx);
 
+            let progress_update_count = AtomicUsize::new(0);
             self.executor
                 .scoped(|scope| {
-                    // While the scan is running, listen for path update requests from the worktree,
-                    // and report updates to the worktree based on a timer.
-                    scope.spawn(async {
-                        let reporting_timer = self.pause_between_initializing_updates().fuse();
-                        futures::pin_mut!(reporting_timer);
-                        loop {
-                            select_biased! {
-                                job = changed_paths.next().fuse() => {
-                                    let Some((abs_paths, barrier)) = job else { break };
-                                    self.update_entries_for_paths(abs_paths, None).await;
-                                    if self
-                                        .notify
-                                        .unbounded_send(ScanState::Initializing {
-                                            snapshot: self.snapshot.lock().clone(),
-                                            barrier: Some(barrier),
-                                        })
-                                        .is_err()
-                                    {
-                                        break;
-                                    }
-                                }
-                                _ = reporting_timer => {
-                                    if self
-                                        .notify
-                                        .unbounded_send(ScanState::Initializing {
-                                            snapshot: self.snapshot.lock().clone(),
-                                            barrier: None,
-                                        })
-                                        .is_err()
-                                    {
-                                        break;
+                    for _ in 0..self.executor.num_cpus() {
+                        scope.spawn(async {
+                            let mut last_progress_update_count = 0;
+                            let progress_update_timer = self.pause_between_progress_updates().fuse();
+                            futures::pin_mut!(progress_update_timer);
+                            loop {
+                                select_biased! {
+                                    // Send periodic progress updates to the worktree. Use an atomic counter
+                                    // to ensure that only one of the workers sends a progress update after
+                                    // the update interval elapses.
+                                    _ = progress_update_timer => {
+                                        match progress_update_count.compare_exchange(
+                                            last_progress_update_count,
+                                            last_progress_update_count + 1,
+                                            SeqCst,
+                                            SeqCst
+                                        ) {
+                                            Ok(_) => {
+                                                last_progress_update_count += 1;
+                                                if self
+                                                    .notify
+                                                    .unbounded_send(ScanState::Initializing {
+                                                        snapshot: self.snapshot.lock().clone(),
+                                                        barrier: None,
+                                                    })
+                                                    .is_err()
+                                                {
+                                                    break;
+                                                }
+                                            }
+                                            Err(current_count) => last_progress_update_count = current_count,
+                                        }
+                                        progress_update_timer.set(self.pause_between_progress_updates().fuse());
                                     }
-                                    reporting_timer.set(self.pause_between_initializing_updates().fuse());
-                                }
-                                job = rx.recv().fuse() => {
-                                    let Ok(job) = job else { break };
-                                    if let Err(err) = self
-                                        .scan_dir(root_char_bag, next_entry_id.clone(), &job)
-                                        .await
-                                    {
-                                        log::error!("error scanning {:?}: {}", job.abs_path, err);
+
+                                    // Refresh any paths requested by the main thread.
+                                    job = changed_paths.recv().fuse() => {
+                                        let Ok((abs_paths, barrier)) = job else { break };
+                                        self.update_entries_for_paths(abs_paths, None).await;
+                                        if self
+                                            .notify
+                                            .unbounded_send(ScanState::Initializing {
+                                                snapshot: self.snapshot.lock().clone(),
+                                                barrier: Some(barrier),
+                                            })
+                                            .is_err()
+                                        {
+                                            break;
+                                        }
                                     }
-                                }
-                            }
-                        }
-                    });
 
-                    // Spawn worker threads to scan the directory recursively.
-                    for _ in 1..self.executor.num_cpus() {
-                        scope.spawn(async {
-                            while let Ok(job) = rx.recv().await {
-                                if let Err(err) = self
-                                    .scan_dir(root_char_bag, next_entry_id.clone(), &job)
-                                    .await
-                                {
-                                    log::error!("error scanning {:?}: {}", job.abs_path, err);
+                                    // Recursively load directories from the file system.
+                                    job = rx.recv().fuse() => {
+                                        let Ok(job) = job else { break };
+                                        if let Err(err) = self
+                                            .scan_dir(root_char_bag, next_entry_id.clone(), &job)
+                                            .await
+                                        {
+                                            log::error!("error scanning {:?}: {}", job.abs_path, err);
+                                        }
+                                    }
                                 }
                             }
                         });
@@ -2370,7 +2379,7 @@ impl BackgroundScanner {
         }
     }
 
-    async fn pause_between_initializing_updates(&self) {
+    async fn pause_between_progress_updates(&self) {
         #[cfg(any(test, feature = "test-support"))]
         if self.fs.is_fake() {
             return self.executor.simulate_random_delay().await;