Merge pull request #787 from zed-industries/buffer-divergence

Antonio Scandurra created

Fix divergence bug when peer reconnects reusing a prior replica id

Change summary

crates/language/src/tests.rs | 56 ++++++++++++++++++++++++++++++++++---
crates/text/src/text.rs      |  5 ++-
2 files changed, 54 insertions(+), 7 deletions(-)

Detailed changes

crates/language/src/tests.rs 🔗

@@ -821,7 +821,10 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             }
             50..=59 if replica_ids.len() < max_peers => {
                 let old_buffer = buffer.read(cx).to_proto();
-                let new_replica_id = replica_ids.len() as ReplicaId;
+                let new_replica_id = (0..=replica_ids.len() as ReplicaId)
+                    .filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
+                    .choose(&mut rng)
+                    .unwrap();
                 log::info!(
                     "Adding new replica {} (replicating from {})",
                     new_replica_id,
@@ -830,6 +833,11 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 new_buffer = Some(cx.add_model(|cx| {
                     let mut new_buffer =
                         Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap();
+                    log::info!(
+                        "New replica {} text: {:?}",
+                        new_buffer.replica_id(),
+                        new_buffer.text()
+                    );
                     new_buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
                     let network = network.clone();
                     cx.subscribe(&cx.handle(), move |buffer, _, event, _| {
@@ -843,8 +851,33 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                     .detach();
                     new_buffer
                 }));
-                replica_ids.push(new_replica_id);
                 network.borrow_mut().replicate(replica_id, new_replica_id);
+
+                if new_replica_id as usize == replica_ids.len() {
+                    replica_ids.push(new_replica_id);
+                } else {
+                    let new_buffer = new_buffer.take().unwrap();
+                    while network.borrow().has_unreceived(new_replica_id) {
+                        let ops = network
+                            .borrow_mut()
+                            .receive(new_replica_id)
+                            .into_iter()
+                            .map(|op| proto::deserialize_operation(op).unwrap());
+                        if ops.len() > 0 {
+                            log::info!(
+                                "peer {} (version: {:?}) applying {} ops from the network. {:?}",
+                                new_replica_id,
+                                buffer.read(cx).version(),
+                                ops.len(),
+                                ops
+                            );
+                            new_buffer.update(cx, |new_buffer, cx| {
+                                new_buffer.apply_ops(ops, cx).unwrap();
+                            });
+                        }
+                    }
+                    buffers[new_replica_id as usize] = new_buffer;
+                }
             }
             60..=69 if mutation_count != 0 => {
                 buffer.update(cx, |buffer, cx| {
@@ -861,9 +894,11 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                     .map(|op| proto::deserialize_operation(op).unwrap());
                 if ops.len() > 0 {
                     log::info!(
-                        "peer {} applying {} ops from the network.",
+                        "peer {} (version: {:?}) applying {} ops from the network. {:?}",
                         replica_id,
-                        ops.len()
+                        buffer.read(cx).version(),
+                        ops.len(),
+                        ops
                     );
                     buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx).unwrap());
                 }
@@ -886,6 +921,12 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
     let first_buffer = buffers[0].read(cx).snapshot();
     for buffer in &buffers[1..] {
         let buffer = buffer.read(cx).snapshot();
+        assert_eq!(
+            buffer.version(),
+            first_buffer.version(),
+            "Replica {} version != Replica 0 version",
+            buffer.replica_id()
+        );
         assert_eq!(
             buffer.text(),
             first_buffer.text(),
@@ -915,7 +956,12 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             .filter(|(replica_id, _)| **replica_id != buffer.replica_id())
             .map(|(replica_id, selections)| (*replica_id, selections.iter().collect::<Vec<_>>()))
             .collect::<Vec<_>>();
-        assert_eq!(actual_remote_selections, expected_remote_selections);
+        assert_eq!(
+            actual_remote_selections,
+            expected_remote_selections,
+            "Replica {} remote selections != expected selections",
+            buffer.replica_id()
+        );
     }
 }
 

crates/text/src/text.rs 🔗

@@ -826,6 +826,8 @@ impl Buffer {
                         edit.timestamp,
                     );
                     self.snapshot.version.observe(edit.timestamp.local());
+                    self.local_clock.observe(edit.timestamp.local());
+                    self.lamport_clock.observe(edit.timestamp.lamport());
                     self.resolve_edit(edit.timestamp.local());
                 }
             }
@@ -836,6 +838,7 @@ impl Buffer {
                 if !self.version.observed(undo.id) {
                     self.apply_undo(&undo)?;
                     self.snapshot.version.observe(undo.id);
+                    self.local_clock.observe(undo.id);
                     self.lamport_clock.observe(lamport_timestamp);
                 }
             }
@@ -1033,8 +1036,6 @@ impl Buffer {
         self.snapshot.visible_text = visible_text;
         self.snapshot.deleted_text = deleted_text;
         self.snapshot.insertions.edit(new_insertions, &());
-        self.local_clock.observe(timestamp.local());
-        self.lamport_clock.observe(timestamp.lamport());
         self.subscriptions.publish_mut(&edits);
     }