Ensure worktree updates are applied in order

Max Brunsfeld and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/project/src/project.rs  |  17 +
crates/project/src/worktree.rs | 276 ++++++++++++++++++++---------------
crates/rpc/proto/zed.proto     |  12 
crates/server/src/rpc.rs       |  10 
crates/server/src/rpc/store.rs |   3 
crates/util/src/lib.rs         |   5 
6 files changed, 183 insertions(+), 140 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -2134,9 +2134,9 @@ impl Project {
         this.update(&mut cx, |this, cx| {
             let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
             if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
-                worktree.update(cx, |worktree, cx| {
+                worktree.update(cx, |worktree, _| {
                     let worktree = worktree.as_remote_mut().unwrap();
-                    worktree.update_from_remote(envelope, cx)
+                    worktree.update_from_remote(envelope)
                 })?;
             }
             Ok(())
@@ -3419,7 +3419,7 @@ mod tests {
             .await;
 
         // Create a remote copy of this worktree.
-        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
+        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot());
         let (remote, load_task) = cx.update(|cx| {
             Worktree::remote(
                 1,
@@ -3495,10 +3495,13 @@ mod tests {
         // Update the remote worktree. Check that it becomes consistent with the
         // local worktree.
         remote.update(&mut cx, |remote, cx| {
-            let update_message =
-                tree.read(cx)
-                    .snapshot()
-                    .build_update(&initial_snapshot, 1, 1, true);
+            let update_message = tree.read(cx).as_local().unwrap().snapshot().build_update(
+                &initial_snapshot,
+                1,
+                1,
+                0,
+                true,
+            );
             remote
                 .as_remote_mut()
                 .unwrap()

crates/project/src/worktree.rs 🔗

@@ -7,8 +7,11 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Result};
 use client::{proto, Client, TypedEnvelope};
 use clock::ReplicaId;
-use collections::HashMap;
-use futures::{Stream, StreamExt};
+use collections::{HashMap, VecDeque};
+use futures::{
+    channel::mpsc::{self, UnboundedSender},
+    Stream, StreamExt,
+};
 use fuzzy::CharBag;
 use gpui::{
     executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
@@ -39,7 +42,7 @@ use std::{
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
-use util::ResultExt;
+use util::{post_inc, ResultExt};
 
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
@@ -75,11 +78,13 @@ pub struct RemoteWorktree {
     project_id: u64,
     snapshot_rx: watch::Receiver<Snapshot>,
     client: Arc<Client>,
-    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
+    updates_tx: UnboundedSender<proto::UpdateWorktree>,
     replica_id: ReplicaId,
     queued_operations: Vec<(u64, Operation)>,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     weak: bool,
+    next_update_id: u64,
+    pending_updates: VecDeque<proto::UpdateWorktree>,
 }
 
 #[derive(Clone)]
@@ -208,7 +213,7 @@ impl Worktree {
             entries_by_id: Default::default(),
         };
 
-        let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
+        let (updates_tx, mut updates_rx) = mpsc::unbounded();
         let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
         let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
             Worktree::Remote(RemoteWorktree {
@@ -233,6 +238,8 @@ impl Worktree {
                     }),
                 ),
                 weak,
+                next_update_id: worktree.next_update_id,
+                pending_updates: Default::default(),
             })
         });
 
@@ -276,7 +283,7 @@ impl Worktree {
 
                 cx.background()
                     .spawn(async move {
-                        while let Some(update) = updates_rx.recv().await {
+                        while let Some(update) = updates_rx.next().await {
                             let mut snapshot = snapshot_tx.borrow().clone();
                             if let Err(error) = snapshot.apply_remote_update(update) {
                                 log::error!("error applying worktree update: {}", error);
@@ -450,7 +457,7 @@ impl LocalWorktree {
         weak: bool,
         fs: Arc<dyn Fs>,
         cx: &mut AsyncAppContext,
-    ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
+    ) -> Result<(ModelHandle<Worktree>, UnboundedSender<ScanState>)> {
         let abs_path = path.into();
         let path: Arc<Path> = Arc::from(Path::new(""));
         let next_entry_id = AtomicUsize::new(0);
@@ -470,7 +477,7 @@ impl LocalWorktree {
             }
         }
 
-        let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
+        let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
         let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
         let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
             let mut snapshot = LocalSnapshot {
@@ -515,7 +522,7 @@ impl LocalWorktree {
             };
 
             cx.spawn_weak(|this, mut cx| async move {
-                while let Ok(scan_state) = scan_states_rx.recv().await {
+                while let Some(scan_state) = scan_states_rx.next().await {
                     if let Some(handle) = this.upgrade(&cx) {
                         let to_send = handle.update(&mut cx, |this, cx| {
                             last_scan_state_tx.blocking_send(scan_state).ok();
@@ -765,10 +772,16 @@ impl LocalWorktree {
             let rpc = rpc.clone();
             let snapshot = snapshot.clone();
             async move {
+                let mut update_id = 0;
                 let mut prev_snapshot = snapshot;
                 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
-                    let message =
-                        snapshot.build_update(&prev_snapshot, project_id, worktree_id, false);
+                    let message = snapshot.build_update(
+                        &prev_snapshot,
+                        project_id,
+                        worktree_id,
+                        post_inc(&mut update_id),
+                        false,
+                    );
                     match rpc.send(message) {
                         Ok(()) => prev_snapshot = snapshot,
                         Err(err) => log::error!("error sending snapshot diff {}", err),
@@ -814,15 +827,31 @@ impl RemoteWorktree {
     pub fn update_from_remote(
         &mut self,
         envelope: TypedEnvelope<proto::UpdateWorktree>,
-        cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
-        let mut tx = self.updates_tx.clone();
-        let payload = envelope.payload.clone();
-        cx.foreground()
-            .spawn(async move {
-                tx.send(payload).await.expect("receiver runs to completion");
-            })
-            .detach();
+        let update = envelope.payload;
+        if update.id > self.next_update_id {
+            let ix = match self
+                .pending_updates
+                .binary_search_by_key(&update.id, |pending| pending.id)
+            {
+                Ok(ix) | Err(ix) => ix,
+            };
+            self.pending_updates.insert(ix, update);
+        } else {
+            let tx = self.updates_tx.clone();
+            self.next_update_id += 1;
+            tx.unbounded_send(update)
+                .expect("consumer runs to completion");
+            while let Some(update) = self.pending_updates.front() {
+                if update.id == self.next_update_id {
+                    self.next_update_id += 1;
+                    tx.unbounded_send(self.pending_updates.pop_front().unwrap())
+                        .expect("consumer runs to completion");
+                } else {
+                    break;
+                }
+            }
+        }
 
         Ok(())
     }
@@ -849,94 +878,6 @@ impl Snapshot {
         self.id
     }
 
-    pub(crate) fn to_proto(
-        &self,
-        diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
-        weak: bool,
-    ) -> proto::Worktree {
-        let root_name = self.root_name.clone();
-        proto::Worktree {
-            id: self.id.0 as u64,
-            root_name,
-            entries: self
-                .entries_by_path
-                .iter()
-                .filter(|e| !e.is_ignored)
-                .map(Into::into)
-                .collect(),
-            diagnostic_summaries: diagnostic_summaries
-                .iter()
-                .map(|(path, summary)| summary.to_proto(path.0.clone()))
-                .collect(),
-            weak,
-        }
-    }
-
-    pub(crate) fn build_update(
-        &self,
-        other: &Self,
-        project_id: u64,
-        worktree_id: u64,
-        include_ignored: bool,
-    ) -> proto::UpdateWorktree {
-        let mut updated_entries = Vec::new();
-        let mut removed_entries = Vec::new();
-        let mut self_entries = self
-            .entries_by_id
-            .cursor::<()>()
-            .filter(|e| include_ignored || !e.is_ignored)
-            .peekable();
-        let mut other_entries = other
-            .entries_by_id
-            .cursor::<()>()
-            .filter(|e| include_ignored || !e.is_ignored)
-            .peekable();
-        loop {
-            match (self_entries.peek(), other_entries.peek()) {
-                (Some(self_entry), Some(other_entry)) => {
-                    match Ord::cmp(&self_entry.id, &other_entry.id) {
-                        Ordering::Less => {
-                            let entry = self.entry_for_id(self_entry.id).unwrap().into();
-                            updated_entries.push(entry);
-                            self_entries.next();
-                        }
-                        Ordering::Equal => {
-                            if self_entry.scan_id != other_entry.scan_id {
-                                let entry = self.entry_for_id(self_entry.id).unwrap().into();
-                                updated_entries.push(entry);
-                            }
-
-                            self_entries.next();
-                            other_entries.next();
-                        }
-                        Ordering::Greater => {
-                            removed_entries.push(other_entry.id as u64);
-                            other_entries.next();
-                        }
-                    }
-                }
-                (Some(self_entry), None) => {
-                    let entry = self.entry_for_id(self_entry.id).unwrap().into();
-                    updated_entries.push(entry);
-                    self_entries.next();
-                }
-                (None, Some(other_entry)) => {
-                    removed_entries.push(other_entry.id as u64);
-                    other_entries.next();
-                }
-                (None, None) => break,
-            }
-        }
-
-        proto::UpdateWorktree {
-            project_id,
-            worktree_id,
-            root_name: self.root_name().to_string(),
-            updated_entries,
-            removed_entries,
-        }
-    }
-
     pub(crate) fn apply_remote_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
         let mut entries_by_path_edits = Vec::new();
         let mut entries_by_id_edits = Vec::new();
@@ -1077,6 +1018,97 @@ impl Snapshot {
 }
 
 impl LocalSnapshot {
+    pub(crate) fn to_proto(
+        &self,
+        diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
+        weak: bool,
+    ) -> proto::Worktree {
+        let root_name = self.root_name.clone();
+        proto::Worktree {
+            id: self.id.0 as u64,
+            root_name,
+            entries: self
+                .entries_by_path
+                .iter()
+                .filter(|e| !e.is_ignored)
+                .map(Into::into)
+                .collect(),
+            diagnostic_summaries: diagnostic_summaries
+                .iter()
+                .map(|(path, summary)| summary.to_proto(path.0.clone()))
+                .collect(),
+            weak,
+            next_update_id: 0,
+        }
+    }
+
+    pub(crate) fn build_update(
+        &self,
+        other: &Self,
+        project_id: u64,
+        worktree_id: u64,
+        update_id: u64,
+        include_ignored: bool,
+    ) -> proto::UpdateWorktree {
+        let mut updated_entries = Vec::new();
+        let mut removed_entries = Vec::new();
+        let mut self_entries = self
+            .entries_by_id
+            .cursor::<()>()
+            .filter(|e| include_ignored || !e.is_ignored)
+            .peekable();
+        let mut other_entries = other
+            .entries_by_id
+            .cursor::<()>()
+            .filter(|e| include_ignored || !e.is_ignored)
+            .peekable();
+        loop {
+            match (self_entries.peek(), other_entries.peek()) {
+                (Some(self_entry), Some(other_entry)) => {
+                    match Ord::cmp(&self_entry.id, &other_entry.id) {
+                        Ordering::Less => {
+                            let entry = self.entry_for_id(self_entry.id).unwrap().into();
+                            updated_entries.push(entry);
+                            self_entries.next();
+                        }
+                        Ordering::Equal => {
+                            if self_entry.scan_id != other_entry.scan_id {
+                                let entry = self.entry_for_id(self_entry.id).unwrap().into();
+                                updated_entries.push(entry);
+                            }
+
+                            self_entries.next();
+                            other_entries.next();
+                        }
+                        Ordering::Greater => {
+                            removed_entries.push(other_entry.id as u64);
+                            other_entries.next();
+                        }
+                    }
+                }
+                (Some(self_entry), None) => {
+                    let entry = self.entry_for_id(self_entry.id).unwrap().into();
+                    updated_entries.push(entry);
+                    self_entries.next();
+                }
+                (None, Some(other_entry)) => {
+                    removed_entries.push(other_entry.id as u64);
+                    other_entries.next();
+                }
+                (None, None) => break,
+            }
+        }
+
+        proto::UpdateWorktree {
+            id: update_id as u64,
+            project_id,
+            worktree_id,
+            root_name: self.root_name().to_string(),
+            updated_entries,
+            removed_entries,
+        }
+    }
+
     fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
         if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
             let abs_path = self.abs_path.join(&entry.path);
@@ -1668,14 +1700,14 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
 struct BackgroundScanner {
     fs: Arc<dyn Fs>,
     snapshot: Arc<Mutex<LocalSnapshot>>,
-    notify: Sender<ScanState>,
+    notify: UnboundedSender<ScanState>,
     executor: Arc<executor::Background>,
 }
 
 impl BackgroundScanner {
     fn new(
         snapshot: Arc<Mutex<LocalSnapshot>>,
-        notify: Sender<ScanState>,
+        notify: UnboundedSender<ScanState>,
         fs: Arc<dyn Fs>,
         executor: Arc<executor::Background>,
     ) -> Self {
@@ -1696,28 +1728,27 @@ impl BackgroundScanner {
     }
 
     async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
-        if self.notify.send(ScanState::Scanning).await.is_err() {
+        if self.notify.unbounded_send(ScanState::Scanning).is_err() {
             return;
         }
 
         if let Err(err) = self.scan_dirs().await {
             if self
                 .notify
-                .send(ScanState::Err(Arc::new(err)))
-                .await
+                .unbounded_send(ScanState::Err(Arc::new(err)))
                 .is_err()
             {
                 return;
             }
         }
 
-        if self.notify.send(ScanState::Idle).await.is_err() {
+        if self.notify.unbounded_send(ScanState::Idle).is_err() {
             return;
         }
 
         futures::pin_mut!(events_rx);
         while let Some(events) = events_rx.next().await {
-            if self.notify.send(ScanState::Scanning).await.is_err() {
+            if self.notify.unbounded_send(ScanState::Scanning).is_err() {
                 break;
             }
 
@@ -1725,7 +1756,7 @@ impl BackgroundScanner {
                 break;
             }
 
-            if self.notify.send(ScanState::Idle).await.is_err() {
+            if self.notify.unbounded_send(ScanState::Idle).is_err() {
                 break;
             }
         }
@@ -2503,7 +2534,7 @@ mod tests {
         }
         log::info!("Generated initial tree");
 
-        let (notify_tx, _notify_rx) = smol::channel::unbounded();
+        let (notify_tx, _notify_rx) = mpsc::unbounded();
         let fs = Arc::new(RealFs);
         let next_entry_id = Arc::new(AtomicUsize::new(0));
         let mut initial_snapshot = LocalSnapshot {
@@ -2563,7 +2594,7 @@ mod tests {
         smol::block_on(scanner.process_events(events));
         scanner.snapshot().check_invariants();
 
-        let (notify_tx, _notify_rx) = smol::channel::unbounded();
+        let (notify_tx, _notify_rx) = mpsc::unbounded();
         let mut new_scanner = BackgroundScanner::new(
             Arc::new(Mutex::new(initial_snapshot)),
             notify_tx,
@@ -2576,6 +2607,7 @@ mod tests {
             new_scanner.snapshot().to_vec(true)
         );
 
+        let mut update_id = 0;
         for mut prev_snapshot in snapshots {
             let include_ignored = rng.gen::<bool>();
             if !include_ignored {
@@ -2596,9 +2628,13 @@ mod tests {
                 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
             }
 
-            let update = scanner
-                .snapshot()
-                .build_update(&prev_snapshot, 0, 0, include_ignored);
+            let update = scanner.snapshot().build_update(
+                &prev_snapshot,
+                0,
+                0,
+                post_inc(&mut update_id),
+                include_ignored,
+            );
             prev_snapshot.apply_remote_update(update).unwrap();
             assert_eq!(
                 prev_snapshot.to_vec(true),

crates/rpc/proto/zed.proto 🔗

@@ -128,11 +128,12 @@ message ShareWorktree {
 }
 
 message UpdateWorktree {
-    uint64 project_id = 1;
-    uint64 worktree_id = 2;
-    string root_name = 3;
-    repeated Entry updated_entries = 4;
-    repeated uint64 removed_entries = 5;
+    uint64 id = 1;
+    uint64 project_id = 2;
+    uint64 worktree_id = 3;
+    string root_name = 4;
+    repeated Entry updated_entries = 5;
+    repeated uint64 removed_entries = 6;
 }
 
 message AddProjectCollaborator {
@@ -386,6 +387,7 @@ message Worktree {
     repeated Entry entries = 3;
     repeated DiagnosticSummary diagnostic_summaries = 4;
     bool weak = 5;
+    uint64 next_update_id = 6;
 }
 
 message File {

crates/server/src/rpc.rs 🔗

@@ -339,6 +339,7 @@ impl Server {
                                 .cloned()
                                 .collect(),
                             weak: worktree.weak,
+                            next_update_id: share.next_update_id as u64,
                         })
                     })
                     .collect();
@@ -477,6 +478,7 @@ impl Server {
             request.sender_id,
             entries,
             diagnostic_summaries,
+            worktree.next_update_id,
         )?;
 
         broadcast(
@@ -1115,7 +1117,7 @@ mod tests {
             LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
         },
         lsp,
-        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
+        project::{DiagnosticSummary, Project, ProjectPath},
         workspace::{Workspace, WorkspaceParams},
     };
 
@@ -1486,11 +1488,6 @@ mod tests {
         buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
         buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
 
-        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
-        // when interpreting the change event it will mistakenly think that the file has been
-        // deleted (because its path has changed) and will subsequently fail to detect the rename.
-        worktree_a.flush_fs_events(&cx_a).await;
-
         // Make changes on host's file system, see those changes on guest worktrees.
         fs.rename(
             "/a/file1".as_ref(),
@@ -1499,6 +1496,7 @@ mod tests {
         )
         .await
         .unwrap();
+
         fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
             .await
             .unwrap();

crates/server/src/rpc/store.rs 🔗

@@ -43,6 +43,7 @@ pub struct ProjectShare {
 pub struct WorktreeShare {
     pub entries: HashMap<u64, proto::Entry>,
     pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
+    pub next_update_id: u64,
 }
 
 #[derive(Default)]
@@ -403,6 +404,7 @@ impl Store {
         connection_id: ConnectionId,
         entries: HashMap<u64, proto::Entry>,
         diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
+        next_update_id: u64,
     ) -> tide::Result<SharedWorktree> {
         let project = self
             .projects
@@ -416,6 +418,7 @@ impl Store {
             worktree.share = Some(WorktreeShare {
                 entries,
                 diagnostic_summaries,
+                next_update_id,
             });
             Ok(SharedWorktree {
                 authorized_user_ids: project.authorized_user_ids(),

crates/util/src/lib.rs 🔗

@@ -4,13 +4,14 @@ pub mod test;
 use futures::Future;
 use std::{
     cmp::Ordering,
+    ops::AddAssign,
     pin::Pin,
     task::{Context, Poll},
 };
 
-pub fn post_inc(value: &mut usize) -> usize {
+pub fn post_inc<T: From<u8> + AddAssign<T> + Copy>(value: &mut T) -> T {
     let prev = *value;
-    *value += 1;
+    *value += T::from(1);
     prev
 }