Prevent eager snapshot mutations from being clobbered by background updates

Max Brunsfeld and Nathan Sobo created

Co-authored-by: Nathan Sobo <nathan@zed.dev>

Change summary

crates/collab/src/rpc.rs        |  4 -
crates/project/src/project.rs   | 33 +++++------
crates/project/src/worktree.rs  | 95 ++++++++++++++++++++++++----------
crates/rpc/src/proto.rs         |  3 
crates/sum_tree/src/sum_tree.rs | 19 ++++--
5 files changed, 96 insertions(+), 58 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -159,9 +159,7 @@ impl Server {
                 let span = info_span!(
                     "handle message",
                     payload_type = envelope.payload_type_name(),
-                    payload = serde_json::to_string_pretty(&envelope.payload)
-                        .unwrap()
-                        .as_str(),
+                    payload = format!("{:?}", envelope.payload).as_str(),
                 );
                 let future = (handler)(server, *envelope);
                 async move {

crates/project/src/project.rs 🔗

@@ -719,14 +719,14 @@ impl Project {
                         is_directory: false,
                     })
                     .await?;
-                worktree.update(&mut cx, |worktree, _| {
-                    let worktree = worktree.as_remote_mut().unwrap();
-                    worktree.snapshot.insert_entry(
-                        response
-                            .entry
-                            .ok_or_else(|| anyhow!("missing entry in response"))?,
-                    )
-                })
+                let entry = response
+                    .entry
+                    .ok_or_else(|| anyhow!("missing entry in response"))?;
+                worktree
+                    .update(&mut cx, |worktree, cx| {
+                        worktree.as_remote().unwrap().insert_entry(entry, cx)
+                    })
+                    .await
             }))
         }
     }
@@ -758,15 +758,14 @@ impl Project {
                         new_path: new_path.as_os_str().as_bytes().to_vec(),
                     })
                     .await?;
-                worktree.update(&mut cx, |worktree, _| {
-                    let worktree = worktree.as_remote_mut().unwrap();
-                    worktree.snapshot.remove_entry(entry_id);
-                    worktree.snapshot.insert_entry(
-                        response
-                            .entry
-                            .ok_or_else(|| anyhow!("missing entry in response"))?,
-                    )
-                })
+                let entry = response
+                    .entry
+                    .ok_or_else(|| anyhow!("missing entry in response"))?;
+                worktree
+                    .update(&mut cx, |worktree, cx| {
+                        worktree.as_remote().unwrap().insert_entry(entry, cx)
+                    })
+                    .await
             }))
         }
     }

crates/project/src/worktree.rs 🔗

@@ -29,6 +29,7 @@ use language::{
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use postage::{
+    barrier,
     prelude::{Sink as _, Stream as _},
     watch,
 };
@@ -79,16 +80,21 @@ pub struct LocalWorktree {
 }
 
 pub struct RemoteWorktree {
-    pub(crate) snapshot: Snapshot,
+    pub snapshot: Snapshot,
+    pub(crate) background_snapshot: Arc<Mutex<Snapshot>>,
     project_id: u64,
-    snapshot_rx: watch::Receiver<Snapshot>,
     client: Arc<Client>,
-    updates_tx: UnboundedSender<proto::UpdateWorktree>,
+    updates_tx: UnboundedSender<BackgroundUpdate>,
     replica_id: ReplicaId,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     visible: bool,
 }
 
+enum BackgroundUpdate {
+    Update(proto::UpdateWorktree),
+    Barrier(barrier::Sender),
+}
+
 #[derive(Clone)]
 pub struct Snapshot {
     id: WorktreeId,
@@ -218,13 +224,14 @@ impl Worktree {
         };
 
         let (updates_tx, mut updates_rx) = mpsc::unbounded();
-        let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
+        let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
+        let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
         let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
             Worktree::Remote(RemoteWorktree {
                 project_id: project_remote_id,
                 replica_id,
                 snapshot: snapshot.clone(),
-                snapshot_rx: snapshot_rx.clone(),
+                background_snapshot: background_snapshot.clone(),
                 updates_tx,
                 client: client.clone(),
                 diagnostic_summaries: TreeMap::from_ordered_entries(
@@ -275,37 +282,40 @@ impl Worktree {
                     .await;
 
                 {
-                    let mut snapshot = snapshot_tx.borrow_mut();
+                    let mut snapshot = background_snapshot.lock();
                     snapshot.entries_by_path = entries_by_path;
                     snapshot.entries_by_id = entries_by_id;
+                    snapshot_updated_tx.send(()).await.ok();
                 }
 
                 cx.background()
                     .spawn(async move {
                         while let Some(update) = updates_rx.next().await {
-                            let mut snapshot = snapshot_tx.borrow().clone();
-                            if let Err(error) = snapshot.apply_remote_update(update) {
-                                log::error!("error applying worktree update: {}", error);
+                            if let BackgroundUpdate::Update(update) = update {
+                                if let Err(error) =
+                                    background_snapshot.lock().apply_remote_update(update)
+                                {
+                                    log::error!("error applying worktree update: {}", error);
+                                }
+                                snapshot_updated_tx.send(()).await.ok();
                             }
-                            *snapshot_tx.borrow_mut() = snapshot;
                         }
                     })
                     .detach();
 
-                {
-                    let mut snapshot_rx = snapshot_rx.clone();
+                cx.spawn(|mut cx| {
                     let this = worktree_handle.downgrade();
-                    cx.spawn(|mut cx| async move {
-                        while let Some(_) = snapshot_rx.recv().await {
+                    async move {
+                        while let Some(_) = snapshot_updated_rx.recv().await {
                             if let Some(this) = this.upgrade(&cx) {
                                 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
                             } else {
                                 break;
                             }
                         }
-                    })
-                    .detach();
-                }
+                    }
+                })
+                .detach();
             }
         });
         (worktree_handle, deserialize_task)
@@ -411,7 +421,7 @@ impl Worktree {
                 }
             }
             Self::Remote(worktree) => {
-                worktree.snapshot = worktree.snapshot_rx.borrow().clone();
+                worktree.snapshot = worktree.background_snapshot.lock().clone();
                 cx.emit(Event::UpdatedEntries);
             }
         };
@@ -923,12 +933,21 @@ impl RemoteWorktree {
         envelope: TypedEnvelope<proto::UpdateWorktree>,
     ) -> Result<()> {
         self.updates_tx
-            .unbounded_send(envelope.payload)
+            .unbounded_send(BackgroundUpdate::Update(envelope.payload))
             .expect("consumer runs to completion");
-
         Ok(())
     }
 
+    pub fn finish_pending_remote_updates(&self) -> impl Future<Output = ()> {
+        let (tx, mut rx) = barrier::channel();
+        self.updates_tx
+            .unbounded_send(BackgroundUpdate::Barrier(tx))
+            .expect("consumer runs to completion");
+        async move {
+            rx.recv().await;
+        }
+    }
+
     pub fn update_diagnostic_summary(
         &mut self,
         path: Arc<Path>,
@@ -945,6 +964,29 @@ impl RemoteWorktree {
                 .insert(PathKey(path.clone()), summary);
         }
     }
+
+    pub fn insert_entry(
+        &self,
+        entry: proto::Entry,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Task<Result<Entry>> {
+        cx.spawn(|this, mut cx| async move {
+            this.update(&mut cx, |worktree, _| {
+                worktree
+                    .as_remote_mut()
+                    .unwrap()
+                    .finish_pending_remote_updates()
+            })
+            .await;
+            this.update(&mut cx, |worktree, _| {
+                let worktree = worktree.as_remote_mut().unwrap();
+                let mut snapshot = worktree.background_snapshot.lock();
+                let entry = snapshot.insert_entry(entry);
+                worktree.snapshot = snapshot.clone();
+                entry
+            })
+        })
+    }
 }
 
 impl Snapshot {
@@ -956,17 +998,9 @@ impl Snapshot {
         self.entries_by_id.get(&entry_id, &()).is_some()
     }
 
-    pub(crate) fn remove_entry(&mut self, entry_id: ProjectEntryId) -> Option<Entry> {
-        if let Some(entry) = self.entries_by_id.remove(&entry_id, &()) {
-            self.entries_by_path.remove(&PathKey(entry.path), &())
-        } else {
-            None
-        }
-    }
-
     pub(crate) fn insert_entry(&mut self, entry: proto::Entry) -> Result<Entry> {
         let entry = Entry::try_from((&self.root_char_bag, entry))?;
-        self.entries_by_id.insert_or_replace(
+        let old_entry = self.entries_by_id.insert_or_replace(
             PathEntry {
                 id: entry.id,
                 path: entry.path.clone(),
@@ -975,6 +1009,9 @@ impl Snapshot {
             },
             &(),
         );
+        if let Some(old_entry) = old_entry {
+            self.entries_by_path.remove(&PathKey(old_entry.path), &());
+        }
         self.entries_by_path.insert_or_replace(entry.clone(), &());
         Ok(entry)
     }

crates/rpc/src/proto.rs 🔗

@@ -6,13 +6,14 @@ use prost::Message as _;
 use serde::Serialize;
 use std::any::{Any, TypeId};
 use std::{
+    fmt::Debug,
     io,
     time::{Duration, SystemTime, UNIX_EPOCH},
 };
 
 include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 
-pub trait EnvelopedMessage: Clone + Serialize + Sized + Send + Sync + 'static {
+pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
     const NAME: &'static str;
     const PRIORITY: MessagePriority;
     fn into_envelope(

crates/sum_tree/src/sum_tree.rs 🔗

@@ -483,17 +483,20 @@ impl<T: Item + PartialEq> PartialEq for SumTree<T> {
 impl<T: Item + Eq> Eq for SumTree<T> {}
 
 impl<T: KeyedItem> SumTree<T> {
-    pub fn insert_or_replace(&mut self, item: T, cx: &<T::Summary as Summary>::Context) -> bool {
-        let mut replaced = false;
+    pub fn insert_or_replace(
+        &mut self,
+        item: T,
+        cx: &<T::Summary as Summary>::Context,
+    ) -> Option<T> {
+        let mut replaced = None;
         *self = {
             let mut cursor = self.cursor::<T::Key>();
             let mut new_tree = cursor.slice(&item.key(), Bias::Left, cx);
-            if cursor
-                .item()
-                .map_or(false, |cursor_item| cursor_item.key() == item.key())
-            {
-                cursor.next(cx);
-                replaced = true;
+            if let Some(cursor_item) = cursor.item() {
+                if cursor_item.key() == item.key() {
+                    replaced = Some(cursor_item.clone());
+                    cursor.next(cx);
+                }
             }
             new_tree.push(item, cx);
             new_tree.push_tree(cursor.suffix(cx), cx);