Update snapshot atomically when processing FS events

Max Brunsfeld created

Change summary

crates/project/src/project.rs             |   1 
crates/project/src/worktree.rs            | 146 +++++++++++++-----------
crates/project_panel/src/project_panel.rs |   2 
3 files changed, 79 insertions(+), 70 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -5282,6 +5282,7 @@ mod tests {
                 language_id: Default::default()
             },
         );
+
         // We clear the diagnostics, since the language has changed.
         rust_buffer2.read_with(cx, |buffer, _| {
             assert_eq!(

crates/project/src/worktree.rs 🔗

@@ -1817,14 +1817,14 @@ impl BackgroundScanner {
             let path: Arc<Path> = Arc::from(Path::new(""));
             let abs_path = self.abs_path();
             let (tx, rx) = channel::unbounded();
-            tx.send(ScanJob {
-                abs_path: abs_path.to_path_buf(),
-                path,
-                ignore_stack: IgnoreStack::none(),
-                scan_queue: tx.clone(),
-            })
-            .await
-            .unwrap();
+            self.executor
+                .block(tx.send(ScanJob {
+                    abs_path: abs_path.to_path_buf(),
+                    path,
+                    ignore_stack: IgnoreStack::none(),
+                    scan_queue: tx.clone(),
+                }))
+                .unwrap();
             drop(tx);
 
             self.executor
@@ -1947,83 +1947,91 @@ impl BackgroundScanner {
     }
 
     async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
-        let mut snapshot = self.snapshot();
-        snapshot.scan_id += 1;
+        events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
+        events.dedup_by(|a, b| a.path.starts_with(&b.path));
+
+        let root_char_bag;
+        let root_abs_path;
+        let next_entry_id;
+        {
+            let mut snapshot = self.snapshot.lock();
+            snapshot.scan_id += 1;
+            root_char_bag = snapshot.root_char_bag;
+            root_abs_path = snapshot.abs_path.clone();
+            next_entry_id = snapshot.next_entry_id.clone();
+        }
 
-        let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
+        let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&root_abs_path).await {
             abs_path
         } else {
             return false;
         };
-        let root_char_bag = snapshot.root_char_bag;
-        let next_entry_id = snapshot.next_entry_id.clone();
-
-        events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
-        events.dedup_by(|a, b| a.path.starts_with(&b.path));
+        let metadata = futures::future::join_all(
+            events
+                .iter()
+                .map(|event| self.fs.metadata(&event.path))
+                .collect::<Vec<_>>(),
+        )
+        .await;
 
-        for event in &events {
-            match event.path.strip_prefix(&root_abs_path) {
-                Ok(path) => snapshot.remove_path(&path),
-                Err(_) => {
-                    log::error!(
-                        "unexpected event {:?} for root path {:?}",
-                        event.path,
-                        root_abs_path
-                    );
-                    continue;
+        // Hold the snapshot lock while clearing and re-inserting the root entries
+        // for each event. This way, the snapshot is not observable to the foreground
+        // thread while this operation is in-progress.
+        let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
+        {
+            let mut snapshot = self.snapshot.lock();
+            for event in &events {
+                if let Ok(path) = event.path.strip_prefix(&root_abs_path) {
+                    snapshot.remove_path(&path);
                 }
             }
-        }
 
-        let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
-        for event in events {
-            let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
-                Ok(path) => Arc::from(path.to_path_buf()),
-                Err(_) => {
-                    log::error!(
-                        "unexpected event {:?} for root path {:?}",
-                        event.path,
-                        root_abs_path
-                    );
-                    continue;
-                }
-            };
+            for (event, metadata) in events.into_iter().zip(metadata.into_iter()) {
+                let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
+                    Ok(path) => Arc::from(path.to_path_buf()),
+                    Err(_) => {
+                        log::error!(
+                            "unexpected event {:?} for root path {:?}",
+                            event.path,
+                            root_abs_path
+                        );
+                        continue;
+                    }
+                };
 
-            match self.fs.metadata(&event.path).await {
-                Ok(Some(metadata)) => {
-                    let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
-                    let mut fs_entry = Entry::new(
-                        path.clone(),
-                        &metadata,
-                        snapshot.next_entry_id.as_ref(),
-                        snapshot.root_char_bag,
-                    );
-                    fs_entry.is_ignored = ignore_stack.is_all();
-                    snapshot.insert_entry(fs_entry, self.fs.as_ref());
-                    if metadata.is_dir {
-                        scan_queue_tx
-                            .send(ScanJob {
-                                abs_path: event.path,
-                                path,
-                                ignore_stack,
-                                scan_queue: scan_queue_tx.clone(),
-                            })
-                            .await
-                            .unwrap();
+                match metadata {
+                    Ok(Some(metadata)) => {
+                        let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
+                        let mut fs_entry = Entry::new(
+                            path.clone(),
+                            &metadata,
+                            snapshot.next_entry_id.as_ref(),
+                            snapshot.root_char_bag,
+                        );
+                        fs_entry.is_ignored = ignore_stack.is_all();
+                        snapshot.insert_entry(fs_entry, self.fs.as_ref());
+                        if metadata.is_dir {
+                            self.executor
+                                .block(scan_queue_tx.send(ScanJob {
+                                    abs_path: event.path,
+                                    path,
+                                    ignore_stack,
+                                    scan_queue: scan_queue_tx.clone(),
+                                }))
+                                .unwrap();
+                        }
+                    }
+                    Ok(None) => {}
+                    Err(err) => {
+                        // TODO - create a special 'error' entry in the entries tree to mark this
+                        log::error!("error reading file on event {:?}", err);
                     }
-                }
-                Ok(None) => {}
-                Err(err) => {
-                    // TODO - create a special 'error' entry in the entries tree to mark this
-                    log::error!("error reading file on event {:?}", err);
                 }
             }
+            drop(scan_queue_tx);
         }
 
-        *self.snapshot.lock() = snapshot;
-
         // Scan any directories that were created as part of this event batch.
-        drop(scan_queue_tx);
         self.executor
             .scoped(|scope| {
                 for _ in 0..self.executor.num_cpus() {

crates/project_panel/src/project_panel.rs 🔗

@@ -913,7 +913,7 @@ mod tests {
         );
     }
 
-    #[gpui::test]
+    #[gpui::test(iterations = 30)]
     async fn test_editing_files(cx: &mut gpui::TestAppContext) {
         cx.foreground().forbid_parking();