Fix divergence bug when peer reconnects reusing a prior replica id

Antonio Scandurra created

We forgot to observe the footprint of the reconnecting replica's prior
undos into the local clock. This could cause the replica to generate
edits with a version strictly smaller than what other peers may have
observed. As such, those peers would think they had already seen those
edits and skip them.

Change summary

crates/language/src/tests.rs | 56 ++++++++++++++++++++++++++++++++++---
crates/text/src/text.rs      |  5 ++-
2 files changed, 54 insertions(+), 7 deletions(-)

Detailed changes

crates/language/src/tests.rs 🔗

@@ -821,7 +821,10 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             }
             50..=59 if replica_ids.len() < max_peers => {
                 let old_buffer = buffer.read(cx).to_proto();
-                let new_replica_id = replica_ids.len() as ReplicaId;
+                let new_replica_id = (0..=replica_ids.len() as ReplicaId)
+                    .filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
+                    .choose(&mut rng)
+                    .unwrap();
                 log::info!(
                     "Adding new replica {} (replicating from {})",
                     new_replica_id,
@@ -830,6 +833,11 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 new_buffer = Some(cx.add_model(|cx| {
                     let mut new_buffer =
                         Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap();
+                    log::info!(
+                        "New replica {} text: {:?}",
+                        new_buffer.replica_id(),
+                        new_buffer.text()
+                    );
                     new_buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
                     let network = network.clone();
                     cx.subscribe(&cx.handle(), move |buffer, _, event, _| {
@@ -843,8 +851,33 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                     .detach();
                     new_buffer
                 }));
-                replica_ids.push(new_replica_id);
                 network.borrow_mut().replicate(replica_id, new_replica_id);
+
+                if new_replica_id as usize == replica_ids.len() {
+                    replica_ids.push(new_replica_id);
+                } else {
+                    let new_buffer = new_buffer.take().unwrap();
+                    while network.borrow().has_unreceived(new_replica_id) {
+                        let ops = network
+                            .borrow_mut()
+                            .receive(new_replica_id)
+                            .into_iter()
+                            .map(|op| proto::deserialize_operation(op).unwrap());
+                        if ops.len() > 0 {
+                            log::info!(
+                                "peer {} (version: {:?}) applying {} ops from the network. {:?}",
+                                new_replica_id,
+                                buffer.read(cx).version(),
+                                ops.len(),
+                                ops
+                            );
+                            new_buffer.update(cx, |new_buffer, cx| {
+                                new_buffer.apply_ops(ops, cx).unwrap();
+                            });
+                        }
+                    }
+                    buffers[new_replica_id as usize] = new_buffer;
+                }
             }
             60..=69 if mutation_count != 0 => {
                 buffer.update(cx, |buffer, cx| {
@@ -861,9 +894,11 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                     .map(|op| proto::deserialize_operation(op).unwrap());
                 if ops.len() > 0 {
                     log::info!(
-                        "peer {} applying {} ops from the network.",
+                        "peer {} (version: {:?}) applying {} ops from the network. {:?}",
                         replica_id,
-                        ops.len()
+                        buffer.read(cx).version(),
+                        ops.len(),
+                        ops
                     );
                     buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx).unwrap());
                 }
@@ -886,6 +921,12 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
     let first_buffer = buffers[0].read(cx).snapshot();
     for buffer in &buffers[1..] {
         let buffer = buffer.read(cx).snapshot();
+        assert_eq!(
+            buffer.version(),
+            first_buffer.version(),
+            "Replica {} version != Replica 0 version",
+            buffer.replica_id()
+        );
         assert_eq!(
             buffer.text(),
             first_buffer.text(),
@@ -915,7 +956,12 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             .filter(|(replica_id, _)| **replica_id != buffer.replica_id())
             .map(|(replica_id, selections)| (*replica_id, selections.iter().collect::<Vec<_>>()))
             .collect::<Vec<_>>();
-        assert_eq!(actual_remote_selections, expected_remote_selections);
+        assert_eq!(
+            actual_remote_selections,
+            expected_remote_selections,
+            "Replica {} remote selections != expected selections",
+            buffer.replica_id()
+        );
     }
 }
 

crates/text/src/text.rs 🔗

@@ -826,6 +826,8 @@ impl Buffer {
                         edit.timestamp,
                     );
                     self.snapshot.version.observe(edit.timestamp.local());
+                    self.local_clock.observe(edit.timestamp.local());
+                    self.lamport_clock.observe(edit.timestamp.lamport());
                     self.resolve_edit(edit.timestamp.local());
                 }
             }
@@ -836,6 +838,7 @@ impl Buffer {
                 if !self.version.observed(undo.id) {
                     self.apply_undo(&undo)?;
                     self.snapshot.version.observe(undo.id);
+                    self.local_clock.observe(undo.id);
                     self.lamport_clock.observe(lamport_timestamp);
                 }
             }
@@ -1033,8 +1036,6 @@ impl Buffer {
         self.snapshot.visible_text = visible_text;
         self.snapshot.deleted_text = deleted_text;
         self.snapshot.insertions.edit(new_insertions, &());
-        self.local_clock.observe(timestamp.local());
-        self.lamport_clock.observe(timestamp.lamport());
         self.subscriptions.publish_mut(&edits);
     }