Introduce randomized test for collaboration on buffers

Antonio Scandurra created

This test will exercise serialization of operations as well as peers
replicating from an existing buffer.

Change summary

Cargo.lock                    |   3 
crates/language/src/buffer.rs |  39 ++++++++++
crates/language/src/tests.rs  | 124 +++++++++++++++++++++++++++++++++++++
crates/text/Cargo.toml        |   1 
crates/text/src/tests.rs      |  79 -----------------------
crates/text/src/text.rs       |  11 ++
crates/util/Cargo.toml        |   8 +
crates/util/src/test.rs       |  84 +++++++++++++++++++++++++
8 files changed, 264 insertions(+), 85 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -4858,6 +4858,7 @@ dependencies = [
  "rand 0.8.3",
  "smallvec",
  "sum_tree",
+ "util",
 ]
 
 [[package]]
@@ -5382,8 +5383,10 @@ name = "util"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "clock",
  "futures",
  "log",
+ "rand 0.8.3",
  "serde_json",
  "surf",
  "tempdir",

crates/language/src/buffer.rs 🔗

@@ -1498,6 +1498,10 @@ impl Buffer {
 
 #[cfg(any(test, feature = "test-support"))]
 impl Buffer {
+    pub fn set_group_interval(&mut self, group_interval: Duration) {
+        self.text.set_group_interval(group_interval);
+    }
+
     pub fn randomly_edit<T>(
         &mut self,
         rng: &mut T,
@@ -1506,9 +1510,38 @@ impl Buffer {
     ) where
         T: rand::Rng,
     {
-        self.start_transaction();
-        self.text.randomly_edit(rng, old_range_count);
-        self.end_transaction(cx);
+        let mut old_ranges: Vec<Range<usize>> = Vec::new();
+        for _ in 0..old_range_count {
+            let last_end = old_ranges.last().map_or(0, |last_range| last_range.end + 1);
+            if last_end > self.len() {
+                break;
+            }
+            old_ranges.push(self.text.random_byte_range(last_end, rng));
+        }
+        let new_text_len = rng.gen_range(0..10);
+        let new_text: String = crate::random_char_iter::RandomCharIter::new(&mut *rng)
+            .take(new_text_len)
+            .collect();
+        log::info!(
+            "mutating buffer {} at {:?}: {:?}",
+            self.replica_id(),
+            old_ranges,
+            new_text
+        );
+        self.edit(old_ranges.iter().cloned(), new_text.as_str(), cx);
+    }
+
+    pub fn randomly_undo_redo(&mut self, rng: &mut impl rand::Rng, cx: &mut ModelContext<Self>) {
+        let was_dirty = self.is_dirty();
+        let old_version = self.version.clone();
+
+        let ops = self.text.randomly_undo_redo(rng);
+        if !ops.is_empty() {
+            for op in ops {
+                self.send_operation(Operation::Buffer(op), cx);
+                self.did_edit(&old_version, was_dirty, cx);
+            }
+        }
     }
 }
 

crates/language/src/tests.rs 🔗

@@ -1,13 +1,17 @@
 use super::*;
+use clock::ReplicaId;
 use gpui::{ModelHandle, MutableAppContext};
+use rand::prelude::*;
 use std::{
     cell::RefCell,
+    env,
     iter::FromIterator,
     ops::Range,
     rc::Rc,
     time::{Duration, Instant},
 };
 use unindent::Unindent as _;
+use util::test::Network;
 
 #[cfg(test)]
 #[ctor::ctor]
@@ -808,6 +812,126 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) {
     assert_eq!(buffer2.read(cx).text(), "abcDF");
 }
 
+#[gpui::test(iterations = 100)]
+fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
+    let min_peers = env::var("MIN_PEERS")
+        .map(|i| i.parse().expect("invalid `MIN_PEERS` variable"))
+        .unwrap_or(1);
+    let max_peers = env::var("MAX_PEERS")
+        .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
+        .unwrap_or(5);
+    let operations = env::var("OPERATIONS")
+        .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
+        .unwrap_or(10);
+
+    let base_text_len = rng.gen_range(0..10);
+    let base_text = RandomCharIter::new(&mut rng)
+        .take(base_text_len)
+        .collect::<String>();
+    let mut replica_ids = Vec::new();
+    let mut buffers = Vec::new();
+    let mut network = Network::new(rng.clone());
+
+    for i in 0..rng.gen_range(min_peers..=max_peers) {
+        let buffer = cx.add_model(|cx| {
+            let mut buffer = Buffer::new(i as ReplicaId, base_text.as_str(), cx);
+            buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
+            buffer
+        });
+        buffers.push(buffer);
+        replica_ids.push(i as ReplicaId);
+        network.add_peer(i as ReplicaId);
+        log::info!("Adding initial peer with replica id {}", i);
+    }
+
+    log::info!("initial text: {:?}", base_text);
+
+    let mut now = Instant::now();
+    let mut mutation_count = operations;
+    loop {
+        let replica_index = rng.gen_range(0..replica_ids.len());
+        let replica_id = replica_ids[replica_index];
+        let buffer = &mut buffers[replica_index];
+        let mut new_buffer = None;
+        match rng.gen_range(0..100) {
+            0..=34 if mutation_count != 0 => {
+                buffer.update(cx, |buffer, cx| {
+                    buffer.start_transaction_at(now);
+                    buffer.randomly_edit(&mut rng, 5, cx);
+                    buffer.end_transaction_at(now, cx);
+                    log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text());
+                });
+                mutation_count -= 1;
+            }
+            35..=49 if replica_ids.len() < max_peers => {
+                let old_buffer = buffer.read(cx).to_proto();
+                let new_replica_id = replica_ids.len() as ReplicaId;
+                log::info!(
+                    "Adding new replica {} (replicating from {})",
+                    new_replica_id,
+                    replica_id
+                );
+                new_buffer = Some(cx.add_model(|cx| {
+                    let mut new_buffer =
+                        Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap();
+                    new_buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
+                    new_buffer
+                }));
+                replica_ids.push(new_replica_id);
+                network.replicate(replica_id, new_replica_id);
+            }
+            50..=69 if mutation_count != 0 => {
+                buffer.update(cx, |buffer, cx| {
+                    buffer.randomly_undo_redo(&mut rng, cx);
+                    log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text());
+                });
+                mutation_count -= 1;
+            }
+            70..=99 if network.has_unreceived(replica_id) => {
+                let ops = network
+                    .receive(replica_id)
+                    .into_iter()
+                    .map(|op| proto::deserialize_operation(op).unwrap());
+                if ops.len() > 0 {
+                    log::info!(
+                        "peer {} applying {} ops from the network.",
+                        replica_id,
+                        ops.len()
+                    );
+                    buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx).unwrap());
+                }
+            }
+            _ => {}
+        }
+
+        buffer.update(cx, |buffer, _| {
+            let ops = buffer
+                .operations
+                .drain(..)
+                .map(|op| proto::serialize_operation(&op))
+                .collect();
+            network.broadcast(buffer.replica_id(), ops);
+        });
+        now += Duration::from_millis(rng.gen_range(0..=200));
+        buffers.extend(new_buffer);
+
+        if mutation_count == 0 && network.is_idle() {
+            break;
+        }
+    }
+
+    let first_buffer = buffers[0].read(cx);
+    for buffer in &buffers[1..] {
+        let buffer = buffer.read(cx);
+        assert_eq!(
+            buffer.text(),
+            first_buffer.text(),
+            "Replica {} text != Replica 0 text",
+            buffer.replica_id()
+        );
+    }
+}
+
 fn chunks_with_diagnostics<T: ToOffset + ToPoint>(
     buffer: &Buffer,
     range: Range<T>,

crates/text/Cargo.toml 🔗

@@ -24,6 +24,7 @@ smallvec = { version = "1.6", features = ["union"] }
 [dev-dependencies]
 collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }
 ctor = "0.1"
 env_logger = "0.8"
 rand = "0.8.3"

crates/text/src/tests.rs 🔗

@@ -7,6 +7,7 @@ use std::{
     iter::Iterator,
     time::{Duration, Instant},
 };
+use util::test::Network;
 
 #[cfg(test)]
 #[ctor::ctor]
@@ -602,18 +603,6 @@ fn test_random_concurrent_edits(mut rng: StdRng) {
     }
 }
 
-#[derive(Clone)]
-struct Envelope<T: Clone> {
-    message: T,
-    sender: ReplicaId,
-}
-
-struct Network<T: Clone, R: rand::Rng> {
-    inboxes: std::collections::BTreeMap<ReplicaId, Vec<Envelope<T>>>,
-    all_messages: Vec<T>,
-    rng: R,
-}
-
 impl Buffer {
     fn check_invariants(&self) {
         // Ensure every fragment is ordered by locator in the fragment tree and corresponds
@@ -646,69 +635,3 @@ impl Buffer {
         }
     }
 }
-
-impl<T: Clone, R: rand::Rng> Network<T, R> {
-    fn new(rng: R) -> Self {
-        Network {
-            inboxes: Default::default(),
-            all_messages: Vec::new(),
-            rng,
-        }
-    }
-
-    fn add_peer(&mut self, id: ReplicaId) {
-        self.inboxes.insert(id, Vec::new());
-    }
-
-    fn is_idle(&self) -> bool {
-        self.inboxes.values().all(|i| i.is_empty())
-    }
-
-    fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
-        for (replica, inbox) in self.inboxes.iter_mut() {
-            if *replica != sender {
-                for message in &messages {
-                    let min_index = inbox
-                        .iter()
-                        .enumerate()
-                        .rev()
-                        .find_map(|(index, envelope)| {
-                            if sender == envelope.sender {
-                                Some(index + 1)
-                            } else {
-                                None
-                            }
-                        })
-                        .unwrap_or(0);
-
-                    // Insert one or more duplicates of this message *after* the previous
-                    // message delivered by this replica.
-                    for _ in 0..self.rng.gen_range(1..4) {
-                        let insertion_index = self.rng.gen_range(min_index..inbox.len() + 1);
-                        inbox.insert(
-                            insertion_index,
-                            Envelope {
-                                message: message.clone(),
-                                sender,
-                            },
-                        );
-                    }
-                }
-            }
-        }
-        self.all_messages.extend(messages);
-    }
-
-    fn has_unreceived(&self, receiver: ReplicaId) -> bool {
-        !self.inboxes[&receiver].is_empty()
-    }
-
-    fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
-        let inbox = self.inboxes.get_mut(&receiver).unwrap();
-        let count = self.rng.gen_range(0..inbox.len() + 1);
-        inbox
-            .drain(0..count)
-            .map(|envelope| envelope.message)
-            .collect()
-    }
-}

crates/text/src/text.rs 🔗

@@ -1105,7 +1105,6 @@ impl Buffer {
         self.deferred_replicas.clear();
         let mut deferred_ops = Vec::new();
         for op in self.deferred_ops.drain().iter().cloned() {
-            dbg!(&self.version, &op, self.can_apply_op(&op));
             if self.can_apply_op(&op) {
                 self.apply_op(op)?;
             } else {
@@ -1241,7 +1240,15 @@ impl Buffer {
 
 #[cfg(any(test, feature = "test-support"))]
 impl Buffer {
-    fn random_byte_range(&mut self, start_offset: usize, rng: &mut impl rand::Rng) -> Range<usize> {
+    pub fn set_group_interval(&mut self, group_interval: Duration) {
+        self.history.group_interval = group_interval;
+    }
+
+    pub fn random_byte_range(
+        &mut self,
+        start_offset: usize,
+        rng: &mut impl rand::Rng,
+    ) -> Range<usize> {
         let end = self.clip_offset(rng.gen_range(start_offset..=self.len()), Bias::Right);
         let start = self.clip_offset(rng.gen_range(start_offset..=end), Bias::Right);
         start..end

crates/util/Cargo.toml 🔗

@@ -4,12 +4,16 @@ version = "0.1.0"
 edition = "2018"
 
 [features]
-test-support = ["serde_json", "tempdir"]
+test-support = ["clock", "rand", "serde_json", "tempdir"]
 
 [dependencies]
+clock = { path = "../clock", optional = true }
 anyhow = "1.0.38"
 futures = "0.3"
 log = "0.4"
+rand = { version = "0.8", optional = true }
 surf = "2.2"
 tempdir = { version = "0.3.7", optional = true }
-serde_json = { version = "1.0.64", features = ["preserve_order"], optional = true }
+serde_json = { version = "1.0.64", features = [
+    "preserve_order"
+], optional = true }

crates/util/src/test.rs 🔗

@@ -1,6 +1,90 @@
+use clock::ReplicaId;
 use std::path::{Path, PathBuf};
 use tempdir::TempDir;
 
+#[derive(Clone)]
+struct Envelope<T: Clone> {
+    message: T,
+    sender: ReplicaId,
+}
+
+pub struct Network<T: Clone, R: rand::Rng> {
+    inboxes: std::collections::BTreeMap<ReplicaId, Vec<Envelope<T>>>,
+    all_messages: Vec<T>,
+    rng: R,
+}
+
+impl<T: Clone, R: rand::Rng> Network<T, R> {
+    pub fn new(rng: R) -> Self {
+        Network {
+            inboxes: Default::default(),
+            all_messages: Vec::new(),
+            rng,
+        }
+    }
+
+    pub fn add_peer(&mut self, id: ReplicaId) {
+        self.inboxes.insert(id, Vec::new());
+    }
+
+    pub fn replicate(&mut self, old_replica_id: ReplicaId, new_replica_id: ReplicaId) {
+        self.inboxes
+            .insert(new_replica_id, self.inboxes[&old_replica_id].clone());
+    }
+
+    pub fn is_idle(&self) -> bool {
+        self.inboxes.values().all(|i| i.is_empty())
+    }
+
+    pub fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
+        for (replica, inbox) in self.inboxes.iter_mut() {
+            if *replica != sender {
+                for message in &messages {
+                    let min_index = inbox
+                        .iter()
+                        .enumerate()
+                        .rev()
+                        .find_map(|(index, envelope)| {
+                            if sender == envelope.sender {
+                                Some(index + 1)
+                            } else {
+                                None
+                            }
+                        })
+                        .unwrap_or(0);
+
+                    // Insert one or more duplicates of this message *after* the previous
+                    // message delivered by this replica.
+                    for _ in 0..self.rng.gen_range(1..4) {
+                        let insertion_index = self.rng.gen_range(min_index..inbox.len() + 1);
+                        inbox.insert(
+                            insertion_index,
+                            Envelope {
+                                message: message.clone(),
+                                sender,
+                            },
+                        );
+                    }
+                }
+            }
+        }
+        self.all_messages.extend(messages);
+    }
+
+    pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
+        !self.inboxes[&receiver].is_empty()
+    }
+
+    pub fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
+        let inbox = self.inboxes.get_mut(&receiver).unwrap();
+        let count = self.rng.gen_range(0..inbox.len() + 1);
+        inbox
+            .drain(0..count)
+            .map(|envelope| envelope.message)
+            .collect()
+    }
+}
+
 pub fn temp_tree(tree: serde_json::Value) -> TempDir {
     let dir = TempDir::new("").unwrap();
     write_tree(dir.path(), tree);