Only mutate background snapshot in the background scanner

Max Brunsfeld created

Change summary

crates/project/src/worktree.rs | 284 ++++++++++++++++-------------------
1 file changed, 131 insertions(+), 153 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -9,7 +9,7 @@ use fs::LineEnding;
 use fs::{repository::GitRepository, Fs};
 use futures::{
     channel::{
-        mpsc::{self, UnboundedSender},
+        mpsc::{self, UnboundedReceiver, UnboundedSender},
         oneshot,
     },
     Stream, StreamExt,
@@ -29,6 +29,7 @@ use language::{
     Buffer, DiagnosticEntry, PointUtf16, Rope, RopeFingerprint, Unclipped,
 };
 use parking_lot::Mutex;
+use postage::barrier;
 use postage::{
     prelude::{Sink as _, Stream as _},
     watch,
@@ -55,7 +56,6 @@ use util::{ResultExt, TryFutureExt};
 #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
 pub struct WorktreeId(usize);
 
-#[allow(clippy::large_enum_variant)]
 pub enum Worktree {
     Local(LocalWorktree),
     Remote(RemoteWorktree),
@@ -63,7 +63,7 @@ pub enum Worktree {
 
 pub struct LocalWorktree {
     snapshot: LocalSnapshot,
-    background_snapshot: Arc<Mutex<LocalSnapshot>>,
+    path_changes_tx: mpsc::UnboundedSender<(Vec<PathBuf>, barrier::Sender)>,
     is_scanning: (watch::Sender<bool>, watch::Receiver<bool>),
     _background_scanner_task: Task<()>,
     share: Option<ShareState>,
@@ -156,14 +156,17 @@ impl DerefMut for LocalSnapshot {
     }
 }
 
-#[derive(Clone, Debug)]
 enum ScanState {
     /// The worktree is performing its initial scan of the filesystem.
     Initializing(LocalSnapshot),
     Initialized(LocalSnapshot),
     /// The worktree is updating in response to filesystem events.
     Updating,
-    Updated(LocalSnapshot, HashMap<Arc<Path>, PathChange>),
+    Updated {
+        snapshot: LocalSnapshot,
+        changes: HashMap<Arc<Path>, PathChange>,
+        barrier: Option<barrier::Sender>,
+    },
     Err(Arc<anyhow::Error>),
 }
 
@@ -234,8 +237,8 @@ impl Worktree {
                 );
             }
 
+            let (path_changes_tx, path_changes_rx) = mpsc::unbounded();
             let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
-            let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
 
             cx.spawn_weak(|this, mut cx| async move {
                 while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) {
@@ -250,21 +253,21 @@ impl Worktree {
 
             let background_scanner_task = cx.background().spawn({
                 let fs = fs.clone();
-                let background_snapshot = background_snapshot.clone();
+                let snapshot = snapshot.clone();
                 let background = cx.background().clone();
                 async move {
                     let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
-                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background)
-                        .run(events)
+                    BackgroundScanner::new(snapshot, scan_states_tx, fs, background)
+                        .run(events, path_changes_rx)
                         .await;
                 }
             });
 
             Worktree::Local(LocalWorktree {
                 snapshot,
-                background_snapshot,
                 is_scanning: watch::channel_with(true),
                 share: None,
+                path_changes_tx,
                 _background_scanner_task: background_scanner_task,
                 diagnostics: Default::default(),
                 diagnostic_summaries: Default::default(),
@@ -546,10 +549,15 @@ impl LocalWorktree {
             ScanState::Updating => {
                 *self.is_scanning.0.borrow_mut() = true;
             }
-            ScanState::Updated(new_snapshot, changes) => {
+            ScanState::Updated {
+                snapshot: new_snapshot,
+                changes,
+                barrier,
+            } => {
                 *self.is_scanning.0.borrow_mut() = false;
                 cx.emit(Event::UpdatedEntries(changes));
                 self.set_snapshot(new_snapshot, cx);
+                drop(barrier);
             }
             ScanState::Err(error) => {
                 *self.is_scanning.0.borrow_mut() = false;
@@ -660,9 +668,7 @@ impl LocalWorktree {
             // Eagerly populate the snapshot with an updated entry for the loaded file
             let entry = this
                 .update(&mut cx, |this, cx| {
-                    this.as_local()
-                        .unwrap()
-                        .refresh_entry(path, abs_path, None, cx)
+                    this.as_local().unwrap().refresh_entry(path, None, cx)
                 })
                 .await?;
 
@@ -780,10 +786,13 @@ impl LocalWorktree {
         cx: &mut ModelContext<Worktree>,
     ) -> Option<Task<Result<()>>> {
         let entry = self.entry_for_id(entry_id)?.clone();
-        let abs_path = self.absolutize(&entry.path);
+        let path = entry.path.clone();
+        let abs_path = self.absolutize(&path);
+        let (tx, mut rx) = barrier::channel();
+
         let delete = cx.background().spawn({
+            let abs_path = abs_path.clone();
             let fs = self.fs.clone();
-            let abs_path = abs_path;
             async move {
                 if entry.is_file() {
                     fs.remove_file(&abs_path, Default::default()).await
@@ -802,17 +811,14 @@ impl LocalWorktree {
 
         Some(cx.spawn(|this, mut cx| async move {
             delete.await?;
-            this.update(&mut cx, |this, cx| {
-                let this = this.as_local_mut().unwrap();
-
-                this.background_snapshot.lock().delete_entry(entry_id);
-
-                if let Some(path) = this.snapshot.delete_entry(entry_id) {
-                    cx.emit(Event::UpdatedEntries(
-                        [(path, PathChange::Removed)].into_iter().collect(),
-                    ));
-                }
+            this.update(&mut cx, |this, _| {
+                this.as_local_mut()
+                    .unwrap()
+                    .path_changes_tx
+                    .unbounded_send((vec![abs_path], tx))
+                    .unwrap();
             });
+            rx.recv().await;
             Ok(())
         }))
     }
@@ -826,29 +832,21 @@ impl LocalWorktree {
         let old_path = self.entry_for_id(entry_id)?.path.clone();
         let new_path = new_path.into();
         let abs_old_path = self.absolutize(&old_path);
-        let abs_new_path = self.absolutize(new_path.as_ref());
-        let rename = cx.background().spawn({
-            let fs = self.fs.clone();
-            let abs_new_path = abs_new_path.clone();
-            async move {
-                fs.rename(&abs_old_path, &abs_new_path, Default::default())
-                    .await
-            }
+        let abs_new_path = self.absolutize(&new_path);
+        let fs = self.fs.clone();
+        let rename = cx.background().spawn(async move {
+            fs.rename(&abs_old_path, &abs_new_path, Default::default())
+                .await
         });
 
         Some(cx.spawn(|this, mut cx| async move {
             rename.await?;
-            let entry = this
-                .update(&mut cx, |this, cx| {
-                    this.as_local_mut().unwrap().refresh_entry(
-                        new_path.clone(),
-                        abs_new_path,
-                        Some(old_path),
-                        cx,
-                    )
-                })
-                .await?;
-            Ok(entry)
+            this.update(&mut cx, |this, cx| {
+                this.as_local_mut()
+                    .unwrap()
+                    .refresh_entry(new_path.clone(), Some(old_path), cx)
+            })
+            .await
         }))
     }
 
@@ -862,33 +860,25 @@ impl LocalWorktree {
         let new_path = new_path.into();
         let abs_old_path = self.absolutize(&old_path);
         let abs_new_path = self.absolutize(&new_path);
-        let copy = cx.background().spawn({
-            let fs = self.fs.clone();
-            let abs_new_path = abs_new_path.clone();
-            async move {
-                copy_recursive(
-                    fs.as_ref(),
-                    &abs_old_path,
-                    &abs_new_path,
-                    Default::default(),
-                )
-                .await
-            }
+        let fs = self.fs.clone();
+        let copy = cx.background().spawn(async move {
+            copy_recursive(
+                fs.as_ref(),
+                &abs_old_path,
+                &abs_new_path,
+                Default::default(),
+            )
+            .await
         });
 
         Some(cx.spawn(|this, mut cx| async move {
             copy.await?;
-            let entry = this
-                .update(&mut cx, |this, cx| {
-                    this.as_local_mut().unwrap().refresh_entry(
-                        new_path.clone(),
-                        abs_new_path,
-                        None,
-                        cx,
-                    )
-                })
-                .await?;
-            Ok(entry)
+            this.update(&mut cx, |this, cx| {
+                this.as_local_mut()
+                    .unwrap()
+                    .refresh_entry(new_path.clone(), None, cx)
+            })
+            .await
         }))
     }
 
@@ -900,90 +890,51 @@ impl LocalWorktree {
     ) -> Task<Result<Entry>> {
         let path = path.into();
         let abs_path = self.absolutize(&path);
-        let write = cx.background().spawn({
-            let fs = self.fs.clone();
-            let abs_path = abs_path.clone();
-            async move {
-                if let Some((text, line_ending)) = text_if_file {
-                    fs.save(&abs_path, &text, line_ending).await
-                } else {
-                    fs.create_dir(&abs_path).await
-                }
+        let fs = self.fs.clone();
+        let write = cx.background().spawn(async move {
+            if let Some((text, line_ending)) = text_if_file {
+                fs.save(&abs_path, &text, line_ending).await
+            } else {
+                fs.create_dir(&abs_path).await
             }
         });
 
         cx.spawn(|this, mut cx| async move {
             write.await?;
-            let entry = this
-                .update(&mut cx, |this, cx| {
-                    this.as_local_mut()
-                        .unwrap()
-                        .refresh_entry(path, abs_path, None, cx)
-                })
-                .await?;
-            Ok(entry)
+            this.update(&mut cx, |this, cx| {
+                this.as_local_mut().unwrap().refresh_entry(path, None, cx)
+            })
+            .await
         })
     }
 
     fn refresh_entry(
         &self,
         path: Arc<Path>,
-        abs_path: PathBuf,
         old_path: Option<Arc<Path>>,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<Entry>> {
         let fs = self.fs.clone();
-        let root_char_bag = self.snapshot.root_char_bag;
-        let next_entry_id = self.snapshot.next_entry_id.clone();
-        cx.spawn_weak(|this, mut cx| async move {
-            let metadata = fs
-                .metadata(&abs_path)
-                .await?
-                .ok_or_else(|| anyhow!("could not read saved file metadata"))?;
-            let this = this
-                .upgrade(&cx)
-                .ok_or_else(|| anyhow!("worktree was dropped"))?;
-            this.update(&mut cx, |this, cx| {
-                let this = this.as_local_mut().unwrap();
-                let mut entry = Entry::new(path, &metadata, &next_entry_id, root_char_bag);
-                entry.is_ignored = this
-                    .snapshot
-                    .ignore_stack_for_abs_path(&abs_path, entry.is_dir())
-                    .is_abs_path_ignored(&abs_path, entry.is_dir());
-
-                {
-                    let mut snapshot = this.background_snapshot.lock();
-                    snapshot.scan_started();
-                    if let Some(old_path) = &old_path {
-                        snapshot.remove_path(old_path);
-                    }
-                    snapshot.insert_entry(entry.clone(), fs.as_ref());
-                    snapshot.scan_completed();
-                }
-
-                let mut changes = HashMap::default();
-
-                this.snapshot.scan_started();
-                if let Some(old_path) = &old_path {
-                    this.snapshot.remove_path(old_path);
-                    changes.insert(old_path.clone(), PathChange::Removed);
-                }
-                let exists = this.snapshot.entry_for_path(&entry.path).is_some();
-                let inserted_entry = this.snapshot.insert_entry(entry, fs.as_ref());
-                changes.insert(
-                    inserted_entry.path.clone(),
-                    if exists {
-                        PathChange::Updated
-                    } else {
-                        PathChange::Added
-                    },
-                );
-                this.snapshot.scan_completed();
+        let abs_path = self.abs_path.clone();
+        let path_changes_tx = self.path_changes_tx.clone();
+        cx.spawn_weak(move |this, mut cx| async move {
+            let abs_path = fs.canonicalize(&abs_path).await?;
+            let paths = if let Some(old_path) = old_path {
+                vec![abs_path.join(&path), abs_path.join(&old_path)]
+            } else {
+                vec![abs_path.join(&path)]
+            };
 
-                eprintln!("refreshed {:?}", changes);
-                cx.emit(Event::UpdatedEntries(changes));
-                Ok(inserted_entry)
-            })
+            let (tx, mut rx) = barrier::channel();
+            path_changes_tx.unbounded_send((paths, tx)).unwrap();
+            rx.recv().await;
+            this.upgrade(&cx)
+                .ok_or_else(|| anyhow!("worktree was dropped"))?
+                .update(&mut cx, |this, _| {
+                    this.entry_for_path(path)
+                        .cloned()
+                        .ok_or_else(|| anyhow!("failed to read path after update"))
+                })
         })
     }
 
@@ -2188,14 +2139,14 @@ struct BackgroundScanner {
 
 impl BackgroundScanner {
     fn new(
-        snapshot: Arc<Mutex<LocalSnapshot>>,
+        snapshot: LocalSnapshot,
         notify: UnboundedSender<ScanState>,
         fs: Arc<dyn Fs>,
         executor: Arc<executor::Background>,
     ) -> Self {
         Self {
             fs,
-            snapshot,
+            snapshot: Arc::new(Mutex::new(snapshot)),
             notify,
             executor,
             changes: Default::default(),
@@ -2206,7 +2157,13 @@ impl BackgroundScanner {
         self.snapshot.lock().abs_path.clone()
     }
 
-    async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
+    async fn run(
+        mut self,
+        events_rx: impl Stream<Item = Vec<fsevent::Event>>,
+        mut changed_paths: UnboundedReceiver<(Vec<PathBuf>, barrier::Sender)>,
+    ) {
+        use futures::{select_biased, FutureExt as _};
+
         // While performing the initial scan, send a new snapshot to the main
         // thread on a recurring interval.
         let initializing_task = self.executor.spawn({
@@ -2260,7 +2217,7 @@ impl BackgroundScanner {
         // Process any events that occurred while performing the initial scan. These
         // events can't be reported as precisely, because there is no snapshot of the
         // worktree before they occurred.
-        if let Some(mut events) = events_rx.next().await {
+        if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) {
             while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
                 events.extend(additional_events);
             }
@@ -2272,10 +2229,11 @@ impl BackgroundScanner {
             }
             if self
                 .notify
-                .unbounded_send(ScanState::Updated(
-                    self.snapshot.lock().clone(),
-                    mem::take(&mut self.changes),
-                ))
+                .unbounded_send(ScanState::Updated {
+                    snapshot: self.snapshot.lock().clone(),
+                    changes: mem::take(&mut self.changes),
+                    barrier: None,
+                })
                 .is_err()
             {
                 return;
@@ -2283,10 +2241,29 @@ impl BackgroundScanner {
         }
 
         // Continue processing events until the worktree is dropped.
-        while let Some(mut events) = events_rx.next().await {
-            while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
-                events.extend(additional_events);
+        loop {
+            let events;
+            let barrier;
+            select_biased! {
+                request = changed_paths.next().fuse() => {
+                    let Some((paths, b)) = request else { break; };
+                    events = paths
+                        .into_iter()
+                        .map(|path| fsevent::Event {
+                            path,
+                            event_id: 0,
+                            flags: fsevent::StreamFlags::NONE
+                        })
+                        .collect::<Vec<_>>();
+                    barrier = Some(b);
+                }
+                e = events_rx.next().fuse() => {
+                    let Some(e) = e else { break; };
+                    events = e;
+                    barrier = None;
+                }
             }
+
             if self.notify.unbounded_send(ScanState::Updating).is_err() {
                 return;
             }
@@ -2295,10 +2272,11 @@ impl BackgroundScanner {
             }
             if self
                 .notify
-                .unbounded_send(ScanState::Updated(
-                    self.snapshot.lock().clone(),
-                    mem::take(&mut self.changes),
-                ))
+                .unbounded_send(ScanState::Updated {
+                    snapshot: self.snapshot.lock().clone(),
+                    changes: mem::take(&mut self.changes),
+                    barrier,
+                })
                 .is_err()
             {
                 return;
@@ -3132,7 +3110,7 @@ mod tests {
 
         let tree = Worktree::local(
             client,
-            Arc::from(Path::new("/root")),
+            Path::new("/root"),
             true,
             fs,
             Default::default(),
@@ -3193,7 +3171,7 @@ mod tests {
         let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
         let tree = Worktree::local(
             client,
-            Arc::from(Path::new("/root")),
+            Path::new("/root"),
             true,
             fs.clone(),
             Default::default(),