Ensure newer snapshots are always detected in `wait_for_snapshot`

Antonio Scandurra created

Change summary

crates/project/src/project.rs  |  8 ++--
crates/project/src/worktree.rs | 64 +++++++++++++++++++++--------------
2 files changed, 42 insertions(+), 30 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -1081,7 +1081,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing entry in response"))?;
                 worktree
                     .update(&mut cx, |worktree, cx| {
-                        worktree.as_remote().unwrap().insert_entry(
+                        worktree.as_remote_mut().unwrap().insert_entry(
                             entry,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1124,7 +1124,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing entry in response"))?;
                 worktree
                     .update(&mut cx, |worktree, cx| {
-                        worktree.as_remote().unwrap().insert_entry(
+                        worktree.as_remote_mut().unwrap().insert_entry(
                             entry,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1167,7 +1167,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing entry in response"))?;
                 worktree
                     .update(&mut cx, |worktree, cx| {
-                        worktree.as_remote().unwrap().insert_entry(
+                        worktree.as_remote_mut().unwrap().insert_entry(
                             entry,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1200,7 +1200,7 @@ impl Project {
                     .await?;
                 worktree
                     .update(&mut cx, move |worktree, cx| {
-                        worktree.as_remote().unwrap().delete_entry(
+                        worktree.as_remote_mut().unwrap().delete_entry(
                             entry_id,
                             response.worktree_scan_id as usize,
                             cx,

crates/project/src/worktree.rs 🔗

@@ -9,7 +9,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Context, Result};
 use client::{proto, Client};
 use clock::ReplicaId;
-use collections::HashMap;
+use collections::{HashMap, VecDeque};
 use futures::{
     channel::{
         mpsc::{self, UnboundedSender},
@@ -82,7 +82,7 @@ pub struct RemoteWorktree {
     project_id: u64,
     client: Arc<Client>,
     updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
-    snapshot_updated_rx: watch::Receiver<()>,
+    snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>,
     replica_id: ReplicaId,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     visible: bool,
@@ -204,7 +204,7 @@ impl Worktree {
                 snapshot: snapshot.clone(),
                 background_snapshot: background_snapshot.clone(),
                 updates_tx: Some(updates_tx),
-                snapshot_updated_rx: snapshot_updated_rx.clone(),
+                snapshot_subscriptions: Default::default(),
                 client: client.clone(),
                 diagnostic_summaries: Default::default(),
                 visible,
@@ -227,7 +227,18 @@ impl Worktree {
             async move {
                 while let Some(_) = snapshot_updated_rx.recv().await {
                     if let Some(this) = this.upgrade(&cx) {
-                        this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
+                        this.update(&mut cx, |this, cx| {
+                            this.poll_snapshot(cx);
+                            let this = this.as_remote_mut().unwrap();
+                            while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
+                                if this.observed_snapshot(*scan_id) {
+                                    let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap();
+                                    let _ = tx.send(());
+                                } else {
+                                    break;
+                                }
+                            }
+                        });
                     } else {
                         break;
                     }
@@ -969,25 +980,26 @@ impl RemoteWorktree {
         }
     }
 
-    fn wait_for_snapshot(
-        &self,
-        scan_id: usize,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Task<Option<()>> {
-        let mut rx = self.snapshot_updated_rx.clone();
-        cx.spawn_weak(|worktree, cx| async move {
-            while rx.recv().await.is_some() {
-                let snapshot = worktree
-                    .upgrade(&cx)?
-                    .read_with(&cx, |worktree, _| worktree.snapshot());
-                if snapshot.scan_id > scan_id
-                    || (snapshot.scan_id == scan_id && snapshot.is_complete)
-                {
-                    break;
-                }
+    fn observed_snapshot(&self, scan_id: usize) -> bool {
+        self.scan_id > scan_id || (self.scan_id == scan_id && self.is_complete)
+    }
+
+    fn wait_for_snapshot(&mut self, scan_id: usize) -> impl Future<Output = ()> {
+        let (tx, rx) = oneshot::channel();
+        if self.observed_snapshot(scan_id) {
+            let _ = tx.send(());
+        } else {
+            match self
+                .snapshot_subscriptions
+                .binary_search_by_key(&scan_id, |probe| probe.0)
+            {
+                Ok(ix) | Err(ix) => self.snapshot_subscriptions.insert(ix, (scan_id, tx)),
             }
-            None
-        })
+        }
+
+        async move {
+            let _ = rx.await;
+        }
     }
 
     pub fn update_diagnostic_summary(
@@ -1009,12 +1021,12 @@ impl RemoteWorktree {
     }
 
     pub fn insert_entry(
-        &self,
+        &mut self,
         entry: proto::Entry,
         scan_id: usize,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<Entry>> {
-        let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx);
+        let wait_for_snapshot = self.wait_for_snapshot(scan_id);
         cx.spawn(|this, mut cx| async move {
             wait_for_snapshot.await;
             this.update(&mut cx, |worktree, _| {
@@ -1028,12 +1040,12 @@ impl RemoteWorktree {
     }
 
     pub(crate) fn delete_entry(
-        &self,
+        &mut self,
         id: ProjectEntryId,
         scan_id: usize,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<()>> {
-        let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx);
+        let wait_for_snapshot = self.wait_for_snapshot(scan_id);
         cx.spawn(|this, mut cx| async move {
             wait_for_snapshot.await;
             this.update(&mut cx, |worktree, _| {