Ensure worktree updates are observed in order on the server

Antonio Scandurra created

Change summary

crates/project/src/worktree.rs | 13 ++++++++-----
crates/rpc/src/proto.rs        |  1 +
crates/server/src/rpc.rs       |  7 ++++---
crates/server/src/rpc/store.rs |  6 ++++++
4 files changed, 19 insertions(+), 8 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -43,7 +43,7 @@ use std::{
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
-use util::{post_inc, ResultExt};
+use util::ResultExt;
 
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
@@ -796,11 +796,14 @@ impl LocalWorktree {
                         &prev_snapshot,
                         project_id,
                         worktree_id,
-                        post_inc(&mut update_id),
+                        update_id,
                         false,
                     );
-                    match rpc.send(message) {
-                        Ok(()) => prev_snapshot = snapshot,
+                    match rpc.request(message).await {
+                        Ok(_) => {
+                            prev_snapshot = snapshot;
+                            update_id += 1;
+                        }
                         Err(err) => log::error!("error sending snapshot diff {}", err),
                     }
                 }
@@ -2451,7 +2454,7 @@ mod tests {
         fmt::Write,
         time::{SystemTime, UNIX_EPOCH},
     };
-    use util::test::temp_tree;
+    use util::{post_inc, test::temp_tree};
 
     #[gpui::test]
     async fn test_traversal(cx: gpui::TestAppContext) {

crates/rpc/src/proto.rs 🔗

@@ -199,6 +199,7 @@ request_messages!(
     (ShareProject, Ack),
     (ShareWorktree, Ack),
     (UpdateBuffer, Ack),
+    (UpdateWorktree, Ack),
 );
 
 entity_messages!(

crates/server/src/rpc.rs 🔗

@@ -75,7 +75,7 @@ impl Server {
             .add_request_handler(Server::register_worktree)
             .add_message_handler(Server::unregister_worktree)
             .add_request_handler(Server::share_worktree)
-            .add_message_handler(Server::update_worktree)
+            .add_request_handler(Server::update_worktree)
             .add_message_handler(Server::update_diagnostic_summary)
             .add_message_handler(Server::disk_based_diagnostics_updating)
             .add_message_handler(Server::disk_based_diagnostics_updated)
@@ -497,11 +497,12 @@ impl Server {
     async fn update_worktree(
         mut self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateWorktree>,
-    ) -> tide::Result<()> {
+    ) -> tide::Result<proto::Ack> {
         let connection_ids = self.state_mut().update_worktree(
             request.sender_id,
             request.payload.project_id,
             request.payload.worktree_id,
+            request.payload.id,
             &request.payload.removed_entries,
             &request.payload.updated_entries,
         )?;
@@ -511,7 +512,7 @@ impl Server {
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
         })?;
 
-        Ok(())
+        Ok(proto::Ack {})
     }
 
     async fn update_diagnostic_summary(

crates/server/src/rpc/store.rs 🔗

@@ -537,6 +537,7 @@ impl Store {
         connection_id: ConnectionId,
         project_id: u64,
         worktree_id: u64,
+        update_id: u64,
         removed_entries: &[u64],
         updated_entries: &[proto::Entry],
     ) -> tide::Result<Vec<ConnectionId>> {
@@ -548,6 +549,11 @@ impl Store {
             .share
             .as_mut()
             .ok_or_else(|| anyhow!("worktree is not shared"))?;
+        if share.next_update_id != update_id {
+            return Err(anyhow!("received worktree updates out-of-order"))?;
+        }
+
+        share.next_update_id = update_id + 1;
         for entry_id in removed_entries {
             share.entries.remove(&entry_id);
         }