Merge pull request #2371 from zed-industries/refresh-entry-delay

Max Brunsfeld created

Restructure background scanner to handle refresh requests even while scanning directories

Change summary

crates/project/src/project_tests.rs |   2 
crates/project/src/worktree.rs      | 693 ++++++++++++++----------------
crates/sum_tree/src/tree_map.rs     |   6 
3 files changed, 325 insertions(+), 376 deletions(-)

Detailed changes

crates/project/src/project_tests.rs 🔗

@@ -2183,7 +2183,7 @@ async fn test_apply_code_actions_with_commands(cx: &mut gpui::TestAppContext) {
     });
 }
 
-#[gpui::test]
+#[gpui::test(iterations = 10)]
 async fn test_save_file(cx: &mut gpui::TestAppContext) {
     let fs = FakeFs::new(cx.background());
     fs.insert_tree(

crates/project/src/worktree.rs 🔗

@@ -12,7 +12,9 @@ use futures::{
         mpsc::{self, UnboundedSender},
         oneshot,
     },
-    select_biased, Stream, StreamExt,
+    select_biased,
+    task::Poll,
+    Stream, StreamExt,
 };
 use fuzzy::CharBag;
 use git::{DOT_GIT, GITIGNORE};
@@ -41,11 +43,11 @@ use std::{
     mem,
     ops::{Deref, DerefMut},
     path::{Path, PathBuf},
+    pin::Pin,
     sync::{
         atomic::{AtomicUsize, Ordering::SeqCst},
         Arc,
     },
-    task::Poll,
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet};
@@ -154,20 +156,12 @@ impl DerefMut for LocalSnapshot {
 }
 
 enum ScanState {
-    /// The worktree is performing its initial scan of the filesystem.
-    Initializing {
-        snapshot: LocalSnapshot,
-        barrier: Option<barrier::Sender>,
-    },
-    Initialized {
-        snapshot: LocalSnapshot,
-    },
-    /// The worktree is updating in response to filesystem events.
-    Updating,
+    Started,
     Updated {
         snapshot: LocalSnapshot,
         changes: HashMap<Arc<Path>, PathChange>,
         barrier: Option<barrier::Sender>,
+        scanning: bool,
     },
 }
 
@@ -244,9 +238,24 @@ impl Worktree {
             cx.spawn_weak(|this, mut cx| async move {
                 while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) {
                     this.update(&mut cx, |this, cx| {
-                        this.as_local_mut()
-                            .unwrap()
-                            .background_scanner_updated(state, cx);
+                        let this = this.as_local_mut().unwrap();
+                        match state {
+                            ScanState::Started => {
+                                *this.is_scanning.0.borrow_mut() = true;
+                            }
+                            ScanState::Updated {
+                                snapshot,
+                                changes,
+                                barrier,
+                                scanning,
+                            } => {
+                                *this.is_scanning.0.borrow_mut() = scanning;
+                                this.set_snapshot(snapshot, cx);
+                                cx.emit(Event::UpdatedEntries(changes));
+                                drop(barrier);
+                            }
+                        }
+                        cx.notify();
                     });
                 }
             })
@@ -258,9 +267,15 @@ impl Worktree {
                 let background = cx.background().clone();
                 async move {
                     let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
-                    BackgroundScanner::new(snapshot, scan_states_tx, fs, background)
-                        .run(events, path_changes_rx)
-                        .await;
+                    BackgroundScanner::new(
+                        snapshot,
+                        fs,
+                        scan_states_tx,
+                        background,
+                        path_changes_rx,
+                    )
+                    .run(events)
+                    .await;
                 }
             });
 
@@ -533,38 +548,6 @@ impl LocalWorktree {
         Ok(updated)
     }
 
-    fn background_scanner_updated(
-        &mut self,
-        scan_state: ScanState,
-        cx: &mut ModelContext<Worktree>,
-    ) {
-        match scan_state {
-            ScanState::Initializing { snapshot, barrier } => {
-                *self.is_scanning.0.borrow_mut() = true;
-                self.set_snapshot(snapshot, cx);
-                drop(barrier);
-            }
-            ScanState::Initialized { snapshot } => {
-                *self.is_scanning.0.borrow_mut() = false;
-                self.set_snapshot(snapshot, cx);
-            }
-            ScanState::Updating => {
-                *self.is_scanning.0.borrow_mut() = true;
-            }
-            ScanState::Updated {
-                snapshot,
-                changes,
-                barrier,
-            } => {
-                *self.is_scanning.0.borrow_mut() = false;
-                cx.emit(Event::UpdatedEntries(changes));
-                self.set_snapshot(snapshot, cx);
-                drop(barrier);
-            }
-        }
-        cx.notify();
-    }
-
     fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
         let updated_repos = Self::changed_repos(
             &self.snapshot.git_repositories,
@@ -1337,14 +1320,6 @@ impl Snapshot {
         &self.root_name
     }
 
-    pub fn scan_started(&mut self) {
-        self.scan_id += 1;
-    }
-
-    pub fn scan_completed(&mut self) {
-        self.completed_scan_id = self.scan_id;
-    }
-
     pub fn scan_id(&self) -> usize {
         self.scan_id
     }
@@ -1539,17 +1514,20 @@ impl LocalSnapshot {
             return;
         };
 
+        match parent_entry.kind {
+            EntryKind::PendingDir => {
+                parent_entry.kind = EntryKind::Dir;
+            }
+            EntryKind::Dir => {}
+            _ => return,
+        }
+
         if let Some(ignore) = ignore {
             self.ignores_by_parent_abs_path.insert(
                 self.abs_path.join(&parent_path).into(),
                 (ignore, self.scan_id),
             );
         }
-        if matches!(parent_entry.kind, EntryKind::PendingDir) {
-            parent_entry.kind = EntryKind::Dir;
-        } else {
-            unreachable!();
-        }
 
         if parent_path.file_name() == Some(&DOT_GIT) {
             let abs_path = self.abs_path.join(&parent_path);
@@ -2135,53 +2113,47 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
 }
 
 struct BackgroundScanner {
-    fs: Arc<dyn Fs>,
     snapshot: Mutex<LocalSnapshot>,
-    notify: UnboundedSender<ScanState>,
+    fs: Arc<dyn Fs>,
+    status_updates_tx: UnboundedSender<ScanState>,
     executor: Arc<executor::Background>,
+    refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
+    prev_state: Mutex<(Snapshot, Vec<Arc<Path>>)>,
+    finished_initial_scan: bool,
 }
 
 impl BackgroundScanner {
     fn new(
         snapshot: LocalSnapshot,
-        notify: UnboundedSender<ScanState>,
         fs: Arc<dyn Fs>,
+        status_updates_tx: UnboundedSender<ScanState>,
         executor: Arc<executor::Background>,
+        refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
     ) -> Self {
         Self {
             fs,
-            snapshot: Mutex::new(snapshot),
-            notify,
+            status_updates_tx,
             executor,
+            refresh_requests_rx,
+            prev_state: Mutex::new((snapshot.snapshot.clone(), Vec::new())),
+            snapshot: Mutex::new(snapshot),
+            finished_initial_scan: false,
         }
     }
 
-    fn abs_path(&self) -> Arc<Path> {
-        self.snapshot.lock().abs_path.clone()
-    }
-
     async fn run(
-        self,
-        events_rx: impl Stream<Item = Vec<fsevent::Event>>,
-        mut changed_paths: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
+        &mut self,
+        mut events_rx: Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>,
     ) {
         use futures::FutureExt as _;
 
-        // Retrieve the basic properties of the root node.
-        let root_char_bag;
-        let root_abs_path;
-        let root_inode;
-        let root_is_dir;
-        let next_entry_id;
-        {
-            let mut snapshot = self.snapshot.lock();
-            snapshot.scan_started();
-            root_char_bag = snapshot.root_char_bag;
-            root_abs_path = snapshot.abs_path.clone();
-            root_inode = snapshot.root_entry().map(|e| e.inode);
-            root_is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir());
-            next_entry_id = snapshot.next_entry_id.clone();
-        }
+        let (root_abs_path, root_inode) = {
+            let snapshot = self.snapshot.lock();
+            (
+                snapshot.abs_path.clone(),
+                snapshot.root_entry().map(|e| e.inode),
+            )
+        };
 
         // Populate ignores above the root.
         let ignore_stack;
@@ -2205,198 +2177,220 @@ impl BackgroundScanner {
             }
         };
 
-        if root_is_dir {
-            let mut ancestor_inodes = TreeSet::default();
-            if let Some(root_inode) = root_inode {
-                ancestor_inodes.insert(root_inode);
+        // Perform an initial scan of the directory.
+        let (scan_job_tx, scan_job_rx) = channel::unbounded();
+        smol::block_on(scan_job_tx.send(ScanJob {
+            abs_path: root_abs_path,
+            path: Arc::from(Path::new("")),
+            ignore_stack,
+            ancestor_inodes: TreeSet::from_ordered_entries(root_inode),
+            scan_queue: scan_job_tx.clone(),
+        }))
+        .unwrap();
+        drop(scan_job_tx);
+        self.scan_dirs(true, scan_job_rx).await;
+        self.send_status_update(false, None);
+
+        // Process any any FS events that occurred while performing the initial scan.
+        // For these events, update events cannot be as precise, because we didn't
+        // have the previous state loaded yet.
+        if let Poll::Ready(Some(events)) = futures::poll!(events_rx.next()) {
+            let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
+            while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
+                paths.extend(more_events.into_iter().map(|e| e.path));
             }
+            self.process_events(paths).await;
+            self.send_status_update(false, None);
+        }
 
-            let (tx, rx) = channel::unbounded();
-            self.executor
-                .block(tx.send(ScanJob {
-                    abs_path: root_abs_path.to_path_buf(),
-                    path: Arc::from(Path::new("")),
-                    ignore_stack,
-                    ancestor_inodes,
-                    scan_queue: tx.clone(),
-                }))
-                .unwrap();
-            drop(tx);
+        self.finished_initial_scan = true;
 
-            let progress_update_count = AtomicUsize::new(0);
-            self.executor
-                .scoped(|scope| {
-                    for _ in 0..self.executor.num_cpus() {
-                        scope.spawn(async {
-                            let mut last_progress_update_count = 0;
-                            let progress_update_timer = self.pause_between_progress_updates().fuse();
-                            futures::pin_mut!(progress_update_timer);
-                            loop {
-                                select_biased! {
-                                    // Send periodic progress updates to the worktree. Use an atomic counter
-                                    // to ensure that only one of the workers sends a progress update after
-                                    // the update interval elapses.
-                                    _ = progress_update_timer => {
-                                        match progress_update_count.compare_exchange(
-                                            last_progress_update_count,
-                                            last_progress_update_count + 1,
-                                            SeqCst,
-                                            SeqCst
-                                        ) {
-                                            Ok(_) => {
-                                                last_progress_update_count += 1;
-                                                if self
-                                                    .notify
-                                                    .unbounded_send(ScanState::Initializing {
-                                                        snapshot: self.snapshot.lock().clone(),
-                                                        barrier: None,
-                                                    })
-                                                    .is_err()
-                                                {
-                                                    break;
-                                                }
-                                            }
-                                            Err(current_count) => last_progress_update_count = current_count,
-                                        }
-                                        progress_update_timer.set(self.pause_between_progress_updates().fuse());
-                                    }
+        // Continue processing events until the worktree is dropped.
+        loop {
+            select_biased! {
+                // Process any path refresh requests from the worktree. Prioritize
+                // these before handling changes reported by the filesystem.
+                request = self.refresh_requests_rx.recv().fuse() => {
+                    let Ok((paths, barrier)) = request else { break };
+                    self.reload_entries_for_paths(paths, None).await;
+                    if !self.send_status_update(false, Some(barrier)) {
+                        break;
+                    }
+                }
 
-                                    // Refresh any paths requested by the main thread.
-                                    job = changed_paths.recv().fuse() => {
-                                        let Ok((abs_paths, barrier)) = job else { break };
-                                        self.update_entries_for_paths(abs_paths, None).await;
-                                        if self
-                                            .notify
-                                            .unbounded_send(ScanState::Initializing {
-                                                snapshot: self.snapshot.lock().clone(),
-                                                barrier: Some(barrier),
-                                            })
-                                            .is_err()
-                                        {
-                                            break;
-                                        }
-                                    }
+                events = events_rx.next().fuse() => {
+                    let Some(events) = events else { break };
+                    let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
+                    while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
+                        paths.extend(more_events.into_iter().map(|e| e.path));
+                    }
+                    self.process_events(paths).await;
+                    self.send_status_update(false, None);
+                }
+            }
+        }
+    }
 
-                                    // Recursively load directories from the file system.
-                                    job = rx.recv().fuse() => {
-                                        let Ok(job) = job else { break };
-                                        if let Err(err) = self
-                                            .scan_dir(root_char_bag, next_entry_id.clone(), &job)
-                                            .await
-                                        {
-                                            log::error!("error scanning {:?}: {}", job.abs_path, err);
-                                        }
+    async fn process_events(&mut self, paths: Vec<PathBuf>) {
+        use futures::FutureExt as _;
+
+        let (scan_job_tx, scan_job_rx) = channel::unbounded();
+        if let Some(mut paths) = self
+            .reload_entries_for_paths(paths, Some(scan_job_tx.clone()))
+            .await
+        {
+            paths.sort_unstable();
+            util::extend_sorted(&mut self.prev_state.lock().1, paths, usize::MAX, Ord::cmp);
+        }
+        drop(scan_job_tx);
+        self.scan_dirs(false, scan_job_rx).await;
+
+        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
+        let snapshot = self.update_ignore_statuses(ignore_queue_tx);
+        self.executor
+            .scoped(|scope| {
+                for _ in 0..self.executor.num_cpus() {
+                    scope.spawn(async {
+                        loop {
+                            select_biased! {
+                                // Process any path refresh requests before moving on to process
+                                // the queue of ignore statuses.
+                                request = self.refresh_requests_rx.recv().fuse() => {
+                                    let Ok((paths, barrier)) = request else { break };
+                                    self.reload_entries_for_paths(paths, None).await;
+                                    if !self.send_status_update(false, Some(barrier)) {
+                                        return;
                                     }
                                 }
+
+                                // Recursively process directories whose ignores have changed.
+                                job = ignore_queue_rx.recv().fuse() => {
+                                    let Ok(job) = job else { break };
+                                    self.update_ignore_status(job, &snapshot).await;
+                                }
                             }
-                        });
-                    }
-                })
-                .await;
-        }
+                        }
+                    });
+                }
+            })
+            .await;
+
+        let mut snapshot = self.snapshot.lock();
+        let mut git_repositories = mem::take(&mut snapshot.git_repositories);
+        git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some());
+        snapshot.git_repositories = git_repositories;
+        snapshot.removed_entry_ids.clear();
+        snapshot.completed_scan_id = snapshot.scan_id;
+    }
 
-        self.snapshot.lock().scan_completed();
+    async fn scan_dirs(
+        &self,
+        enable_progress_updates: bool,
+        scan_jobs_rx: channel::Receiver<ScanJob>,
+    ) {
+        use futures::FutureExt as _;
 
         if self
-            .notify
-            .unbounded_send(ScanState::Initialized {
-                snapshot: self.snapshot.lock().clone(),
-            })
+            .status_updates_tx
+            .unbounded_send(ScanState::Started)
             .is_err()
         {
             return;
         }
 
-        // Process any events that occurred while performing the initial scan. These
-        // events can't be reported as precisely, because there is no snapshot of the
-        // worktree before they occurred.
-        futures::pin_mut!(events_rx);
-        if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) {
-            while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
-                events.extend(additional_events);
-            }
-            let abs_paths = events.into_iter().map(|e| e.path).collect();
-            if self.notify.unbounded_send(ScanState::Updating).is_err() {
-                return;
-            }
-            if let Some(changes) = self.process_events(abs_paths, true).await {
-                if self
-                    .notify
-                    .unbounded_send(ScanState::Updated {
-                        snapshot: self.snapshot.lock().clone(),
-                        changes,
-                        barrier: None,
-                    })
-                    .is_err()
-                {
-                    return;
-                }
-            } else {
-                return;
-            }
-        }
+        let progress_update_count = AtomicUsize::new(0);
+        self.executor
+            .scoped(|scope| {
+                for _ in 0..self.executor.num_cpus() {
+                    scope.spawn(async {
+                        let mut last_progress_update_count = 0;
+                        let progress_update_timer = self.progress_timer(enable_progress_updates).fuse();
+                        futures::pin_mut!(progress_update_timer);
+
+                        loop {
+                            select_biased! {
+                                // Process any path refresh requests before moving on to process
+                                // the scan queue, so that user operations are prioritized.
+                                request = self.refresh_requests_rx.recv().fuse() => {
+                                    let Ok((paths, barrier)) = request else { break };
+                                    self.reload_entries_for_paths(paths, None).await;
+                                    if !self.send_status_update(false, Some(barrier)) {
+                                        return;
+                                    }
+                                }
 
-        // Continue processing events until the worktree is dropped.
-        loop {
-            let barrier;
-            let abs_paths;
-            select_biased! {
-                request = changed_paths.next().fuse() => {
-                    let Some((paths, b)) = request else { break };
-                    abs_paths = paths;
-                    barrier = Some(b);
-                }
-                events = events_rx.next().fuse() => {
-                    let Some(events) = events else { break };
-                    abs_paths = events.into_iter().map(|e| e.path).collect();
-                    barrier = None;
-                }
-            }
+                                // Send periodic progress updates to the worktree. Use an atomic counter
+                                // to ensure that only one of the workers sends a progress update after
+                                // the update interval elapses.
+                                _ = progress_update_timer => {
+                                    match progress_update_count.compare_exchange(
+                                        last_progress_update_count,
+                                        last_progress_update_count + 1,
+                                        SeqCst,
+                                        SeqCst
+                                    ) {
+                                        Ok(_) => {
+                                            last_progress_update_count += 1;
+                                            self.send_status_update(true, None);
+                                        }
+                                        Err(count) => {
+                                            last_progress_update_count = count;
+                                        }
+                                    }
+                                    progress_update_timer.set(self.progress_timer(enable_progress_updates).fuse());
+                                }
 
-            if self.notify.unbounded_send(ScanState::Updating).is_err() {
-                return;
-            }
-            if let Some(changes) = self.process_events(abs_paths, false).await {
-                if self
-                    .notify
-                    .unbounded_send(ScanState::Updated {
-                        snapshot: self.snapshot.lock().clone(),
-                        changes,
-                        barrier,
+                                // Recursively load directories from the file system.
+                                job = scan_jobs_rx.recv().fuse() => {
+                                    let Ok(job) = job else { break };
+                                    if let Err(err) = self.scan_dir(&job).await {
+                                        if job.path.as_ref() != Path::new("") {
+                                            log::error!("error scanning directory {:?}: {}", job.abs_path, err);
+                                        }
+                                    }
+                                }
+                            }
+                        }
                     })
-                    .is_err()
-                {
-                    return;
                 }
-            } else {
-                return;
-            }
-        }
+            })
+            .await;
     }
 
-    async fn pause_between_progress_updates(&self) {
-        #[cfg(any(test, feature = "test-support"))]
-        if self.fs.is_fake() {
-            return self.executor.simulate_random_delay().await;
-        }
-        smol::Timer::after(Duration::from_millis(100)).await;
+    fn send_status_update(&self, scanning: bool, barrier: Option<barrier::Sender>) -> bool {
+        let mut prev_state = self.prev_state.lock();
+        let snapshot = self.snapshot.lock().clone();
+        let mut old_snapshot = snapshot.snapshot.clone();
+        mem::swap(&mut old_snapshot, &mut prev_state.0);
+        let changed_paths = mem::take(&mut prev_state.1);
+        let changes = self.build_change_set(&old_snapshot, &snapshot.snapshot, changed_paths);
+        self.status_updates_tx
+            .unbounded_send(ScanState::Updated {
+                snapshot,
+                changes,
+                scanning,
+                barrier,
+            })
+            .is_ok()
     }
 
-    async fn scan_dir(
-        &self,
-        root_char_bag: CharBag,
-        next_entry_id: Arc<AtomicUsize>,
-        job: &ScanJob,
-    ) -> Result<()> {
+    async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
         let mut new_entries: Vec<Entry> = Vec::new();
         let mut new_jobs: Vec<Option<ScanJob>> = Vec::new();
         let mut ignore_stack = job.ignore_stack.clone();
         let mut new_ignore = None;
-
+        let (root_abs_path, root_char_bag, next_entry_id) = {
+            let snapshot = self.snapshot.lock();
+            (
+                snapshot.abs_path().clone(),
+                snapshot.root_char_bag,
+                snapshot.next_entry_id.clone(),
+            )
+        };
         let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
         while let Some(child_abs_path) = child_paths.next().await {
-            let child_abs_path = match child_abs_path {
-                Ok(child_abs_path) => child_abs_path,
+            let child_abs_path: Arc<Path> = match child_abs_path {
+                Ok(child_abs_path) => child_abs_path.into(),
                 Err(error) => {
                     log::error!("error processing entry {:?}", error);
                     continue;
@@ -2419,8 +2413,7 @@ impl BackgroundScanner {
                 match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
                     Ok(ignore) => {
                         let ignore = Arc::new(ignore);
-                        ignore_stack =
-                            ignore_stack.append(job.abs_path.as_path().into(), ignore.clone());
+                        ignore_stack = ignore_stack.append(job.abs_path.clone(), ignore.clone());
                         new_ignore = Some(ignore);
                     }
                     Err(error) => {
@@ -2438,7 +2431,7 @@ impl BackgroundScanner {
                 // new jobs as well.
                 let mut new_jobs = new_jobs.iter_mut();
                 for entry in &mut new_entries {
-                    let entry_abs_path = self.abs_path().join(&entry.path);
+                    let entry_abs_path = root_abs_path.join(&entry.path);
                     entry.is_ignored =
                         ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir());
 
@@ -2507,69 +2500,18 @@ impl BackgroundScanner {
         Ok(())
     }
 
-    async fn process_events(
-        &self,
-        abs_paths: Vec<PathBuf>,
-        received_before_initialized: bool,
-    ) -> Option<HashMap<Arc<Path>, PathChange>> {
-        let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
-
-        let prev_snapshot = {
-            let mut snapshot = self.snapshot.lock();
-            snapshot.scan_started();
-            snapshot.clone()
-        };
-
-        let event_paths = self
-            .update_entries_for_paths(abs_paths, Some(scan_queue_tx))
-            .await?;
-
-        // Scan any directories that were created as part of this event batch.
-        self.executor
-            .scoped(|scope| {
-                for _ in 0..self.executor.num_cpus() {
-                    scope.spawn(async {
-                        while let Ok(job) = scan_queue_rx.recv().await {
-                            if let Err(err) = self
-                                .scan_dir(
-                                    prev_snapshot.root_char_bag,
-                                    prev_snapshot.next_entry_id.clone(),
-                                    &job,
-                                )
-                                .await
-                            {
-                                log::error!("error scanning {:?}: {}", job.abs_path, err);
-                            }
-                        }
-                    });
-                }
-            })
-            .await;
-
-        // Attempt to detect renames only over a single batch of file-system events.
-        self.snapshot.lock().removed_entry_ids.clear();
-
-        self.update_ignore_statuses().await;
-        self.update_git_repositories();
-        let changes = self.build_change_set(
-            prev_snapshot.snapshot,
-            event_paths,
-            received_before_initialized,
-        );
-        self.snapshot.lock().scan_completed();
-        Some(changes)
-    }
-
-    async fn update_entries_for_paths(
+    async fn reload_entries_for_paths(
         &self,
         mut abs_paths: Vec<PathBuf>,
         scan_queue_tx: Option<Sender<ScanJob>>,
     ) -> Option<Vec<Arc<Path>>> {
+        let doing_recursive_update = scan_queue_tx.is_some();
+
         abs_paths.sort_unstable();
         abs_paths.dedup_by(|a, b| a.starts_with(&b));
 
         let root_abs_path = self.snapshot.lock().abs_path.clone();
-        let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.ok()?;
+        let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.log_err()?;
         let metadata = futures::future::join_all(
             abs_paths
                 .iter()
@@ -2579,29 +2521,35 @@ impl BackgroundScanner {
         .await;
 
         let mut snapshot = self.snapshot.lock();
-        if scan_queue_tx.is_some() {
-            for abs_path in &abs_paths {
-                if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) {
+
+        if snapshot.completed_scan_id == snapshot.scan_id {
+            snapshot.scan_id += 1;
+            if !doing_recursive_update {
+                snapshot.completed_scan_id = snapshot.scan_id;
+            }
+        }
+
+        // Remove any entries for paths that no longer exist or are being recursively
+        // refreshed. Do this before adding any new entries, so that renames can be
+        // detected regardless of the order of the paths.
+        let mut event_paths = Vec::<Arc<Path>>::with_capacity(abs_paths.len());
+        for (abs_path, metadata) in abs_paths.iter().zip(metadata.iter()) {
+            if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) {
+                if matches!(metadata, Ok(None)) || doing_recursive_update {
                     snapshot.remove_path(path);
                 }
+                event_paths.push(path.into());
+            } else {
+                log::error!(
+                    "unexpected event {:?} for root path {:?}",
+                    abs_path,
+                    root_canonical_path
+                );
             }
         }
 
-        let mut event_paths = Vec::with_capacity(abs_paths.len());
-        for (abs_path, metadata) in abs_paths.into_iter().zip(metadata.into_iter()) {
-            let path: Arc<Path> = match abs_path.strip_prefix(&root_canonical_path) {
-                Ok(path) => Arc::from(path.to_path_buf()),
-                Err(_) => {
-                    log::error!(
-                        "unexpected event {:?} for root path {:?}",
-                        abs_path,
-                        root_canonical_path
-                    );
-                    continue;
-                }
-            };
-            event_paths.push(path.clone());
-            let abs_path = root_abs_path.join(&path);
+        for (path, metadata) in event_paths.iter().cloned().zip(metadata.into_iter()) {
+            let abs_path: Arc<Path> = root_abs_path.join(&path).into();
 
             match metadata {
                 Ok(Some(metadata)) => {
@@ -2626,15 +2574,14 @@ impl BackgroundScanner {
                         let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path);
                         if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) {
                             ancestor_inodes.insert(metadata.inode);
-                            self.executor
-                                .block(scan_queue_tx.send(ScanJob {
-                                    abs_path,
-                                    path,
-                                    ignore_stack,
-                                    ancestor_inodes,
-                                    scan_queue: scan_queue_tx.clone(),
-                                }))
-                                .unwrap();
+                            smol::block_on(scan_queue_tx.send(ScanJob {
+                                abs_path,
+                                path,
+                                ignore_stack,
+                                ancestor_inodes,
+                                scan_queue: scan_queue_tx.clone(),
+                            }))
+                            .unwrap();
                         }
                     }
                 }
@@ -2649,7 +2596,10 @@ impl BackgroundScanner {
         Some(event_paths)
     }
 
-    async fn update_ignore_statuses(&self) {
+    fn update_ignore_statuses(
+        &self,
+        ignore_queue_tx: Sender<UpdateIgnoreStatusJob>,
+    ) -> LocalSnapshot {
         let mut snapshot = self.snapshot.lock().clone();
         let mut ignores_to_update = Vec::new();
         let mut ignores_to_delete = Vec::new();
@@ -2674,7 +2624,6 @@ impl BackgroundScanner {
                 .remove(&parent_abs_path);
         }
 
-        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
         ignores_to_update.sort_unstable();
         let mut ignores_to_update = ignores_to_update.into_iter().peekable();
         while let Some(parent_abs_path) = ignores_to_update.next() {
@@ -2686,35 +2635,15 @@ impl BackgroundScanner {
             }
 
             let ignore_stack = snapshot.ignore_stack_for_abs_path(&parent_abs_path, true);
-            ignore_queue_tx
-                .send(UpdateIgnoreStatusJob {
-                    abs_path: parent_abs_path,
-                    ignore_stack,
-                    ignore_queue: ignore_queue_tx.clone(),
-                })
-                .await
-                .unwrap();
+            smol::block_on(ignore_queue_tx.send(UpdateIgnoreStatusJob {
+                abs_path: parent_abs_path,
+                ignore_stack,
+                ignore_queue: ignore_queue_tx.clone(),
+            }))
+            .unwrap();
         }
-        drop(ignore_queue_tx);
 
-        self.executor
-            .scoped(|scope| {
-                for _ in 0..self.executor.num_cpus() {
-                    scope.spawn(async {
-                        while let Ok(job) = ignore_queue_rx.recv().await {
-                            self.update_ignore_status(job, &snapshot).await;
-                        }
-                    });
-                }
-            })
-            .await;
-    }
-
-    fn update_git_repositories(&self) {
-        let mut snapshot = self.snapshot.lock();
-        let mut git_repositories = mem::take(&mut snapshot.git_repositories);
-        git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some());
-        snapshot.git_repositories = git_repositories;
+        snapshot
     }
 
     async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) {
@@ -2728,7 +2657,7 @@ impl BackgroundScanner {
         let path = job.abs_path.strip_prefix(&snapshot.abs_path).unwrap();
         for mut entry in snapshot.child_entries(path).cloned() {
             let was_ignored = entry.is_ignored;
-            let abs_path = self.abs_path().join(&entry.path);
+            let abs_path = snapshot.abs_path().join(&entry.path);
             entry.is_ignored = ignore_stack.is_abs_path_ignored(&abs_path, entry.is_dir());
             if entry.is_dir() {
                 let child_ignore_stack = if entry.is_ignored {
@@ -2762,16 +2691,16 @@ impl BackgroundScanner {
 
     fn build_change_set(
         &self,
-        old_snapshot: Snapshot,
+        old_snapshot: &Snapshot,
+        new_snapshot: &Snapshot,
         event_paths: Vec<Arc<Path>>,
-        received_before_initialized: bool,
     ) -> HashMap<Arc<Path>, PathChange> {
         use PathChange::{Added, AddedOrUpdated, Removed, Updated};
 
-        let new_snapshot = self.snapshot.lock();
         let mut changes = HashMap::default();
         let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>();
         let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>();
+        let received_before_initialized = !self.finished_initial_scan;
 
         for path in event_paths {
             let path = PathKey(path);
@@ -2799,9 +2728,9 @@ impl BackgroundScanner {
                                     // If the worktree was not fully initialized when this event was generated,
                                     // we can't know whether this entry was added during the scan or whether
                                     // it was merely updated.
-                                    changes.insert(old_entry.path.clone(), AddedOrUpdated);
+                                    changes.insert(new_entry.path.clone(), AddedOrUpdated);
                                 } else if old_entry.mtime != new_entry.mtime {
-                                    changes.insert(old_entry.path.clone(), Updated);
+                                    changes.insert(new_entry.path.clone(), Updated);
                                 }
                                 old_paths.next(&());
                                 new_paths.next(&());
@@ -2826,6 +2755,19 @@ impl BackgroundScanner {
         }
         changes
     }
+
+    async fn progress_timer(&self, running: bool) {
+        if !running {
+            return futures::future::pending().await;
+        }
+
+        #[cfg(any(test, feature = "test-support"))]
+        if self.fs.is_fake() {
+            return self.executor.simulate_random_delay().await;
+        }
+
+        smol::Timer::after(Duration::from_millis(100)).await;
+    }
 }
 
 fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
@@ -2839,7 +2781,7 @@ fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
 }
 
 struct ScanJob {
-    abs_path: PathBuf,
+    abs_path: Arc<Path>,
     path: Arc<Path>,
     ignore_stack: Arc<IgnoreStack>,
     scan_queue: Sender<ScanJob>,
@@ -3524,7 +3466,7 @@ mod tests {
 
         let fs = FakeFs::new(cx.background());
         fs.insert_tree(
-            "/a",
+            "/root",
             json!({
                 "b": {},
                 "c": {},
@@ -3535,7 +3477,7 @@ mod tests {
 
         let tree = Worktree::local(
             client,
-            "/a".as_ref(),
+            "/root".as_ref(),
             true,
             fs,
             Default::default(),
@@ -3555,6 +3497,7 @@ mod tests {
         assert!(entry.is_dir());
 
         cx.foreground().run_until_parked();
+
         tree.read_with(cx, |tree, _| {
             assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir);
         });

crates/sum_tree/src/tree_map.rs 🔗

@@ -154,6 +154,12 @@ impl<K> TreeSet<K>
 where
     K: Clone + Debug + Default + Ord,
 {
+    pub fn from_ordered_entries(entries: impl IntoIterator<Item = K>) -> Self {
+        Self(TreeMap::from_ordered_entries(
+            entries.into_iter().map(|key| (key, ())),
+        ))
+    }
+
     pub fn insert(&mut self, key: K) {
         self.0.insert(key, ());
     }