Allow receiving diagnostic updates out of order

Antonio Scandurra created

Change summary

crates/language/src/buffer.rs | 37 ++++++++++++++++++++++++-------
crates/language/src/tests.rs  | 43 +++++++++++++++++++++++++++++++-----
crates/rpc/proto/zed.proto    |  3 +
crates/util/src/test.rs       |  2 -
4 files changed, 67 insertions(+), 18 deletions(-)

Detailed changes

crates/language/src/buffer.rs 🔗

@@ -68,6 +68,7 @@ pub struct Buffer {
     remote_selections: TreeMap<ReplicaId, SelectionSet>,
     selections_update_count: usize,
     diagnostics_update_count: usize,
+    diagnostics_timestamp: clock::Lamport,
     file_update_count: usize,
     language_server: Option<LanguageServerState>,
     completion_triggers: Vec<String>,
@@ -439,9 +440,14 @@ impl Buffer {
         let snapshot = this.snapshot();
         let entries = proto::deserialize_diagnostics(message.diagnostics);
         this.apply_diagnostic_update(
-            DiagnosticSet::from_sorted_entries(entries.into_iter().cloned(), &snapshot),
+            DiagnosticSet::from_sorted_entries(entries.iter().cloned(), &snapshot),
+            clock::Lamport {
+                replica_id: 0,
+                value: message.diagnostics_timestamp,
+            },
             cx,
         );
+
         this.completion_triggers = message.completion_triggers;
 
         Ok(this)
@@ -470,6 +476,7 @@ impl Buffer {
                 })
                 .collect(),
             diagnostics: proto::serialize_diagnostics(self.diagnostics.iter()),
+            diagnostics_timestamp: self.diagnostics_timestamp.value,
             completion_triggers: self.completion_triggers.clone(),
         }
     }
@@ -512,6 +519,7 @@ impl Buffer {
             selections_update_count: 0,
             diagnostics: Default::default(),
             diagnostics_update_count: 0,
+            diagnostics_timestamp: Default::default(),
             file_update_count: 0,
             language_server: None,
             completion_triggers: Default::default(),
@@ -1008,11 +1016,12 @@ impl Buffer {
         drop(edits_since_save);
 
         let set = DiagnosticSet::new(sanitized_diagnostics, content);
-        self.apply_diagnostic_update(set.clone(), cx);
+        let lamport_timestamp = self.text.lamport_clock.tick();
+        self.apply_diagnostic_update(set.clone(), lamport_timestamp, cx);
 
         let op = Operation::UpdateDiagnostics {
             diagnostics: set.iter().cloned().collect(),
-            lamport_timestamp: self.text.lamport_clock.tick(),
+            lamport_timestamp,
         };
         self.send_operation(op, cx);
         Ok(())
@@ -1682,11 +1691,12 @@ impl Buffer {
             }
             Operation::UpdateDiagnostics {
                 diagnostics: diagnostic_set,
-                ..
+                lamport_timestamp,
             } => {
                 let snapshot = self.snapshot();
                 self.apply_diagnostic_update(
                     DiagnosticSet::from_sorted_entries(diagnostic_set.iter().cloned(), &snapshot),
+                    lamport_timestamp,
                     cx,
                 );
             }
@@ -1720,11 +1730,20 @@ impl Buffer {
         }
     }
 
-    fn apply_diagnostic_update(&mut self, diagnostics: DiagnosticSet, cx: &mut ModelContext<Self>) {
-        self.diagnostics = diagnostics;
-        self.diagnostics_update_count += 1;
-        cx.notify();
-        cx.emit(Event::DiagnosticsUpdated);
+    fn apply_diagnostic_update(
+        &mut self,
+        diagnostics: DiagnosticSet,
+        lamport_timestamp: clock::Lamport,
+        cx: &mut ModelContext<Self>,
+    ) {
+        if lamport_timestamp > self.diagnostics_timestamp {
+            self.diagnostics = diagnostics;
+            self.diagnostics_timestamp = lamport_timestamp;
+            self.diagnostics_update_count += 1;
+            self.text.lamport_clock.observe(lamport_timestamp);
+            cx.notify();
+            cx.emit(Event::DiagnosticsUpdated);
+        }
     }
 
     #[cfg(not(test))]

crates/language/src/tests.rs 🔗

@@ -12,7 +12,7 @@ use std::{
     time::{Duration, Instant},
 };
 use unindent::Unindent as _;
-use util::test::Network;
+use util::{post_inc, test::Network};
 
 #[cfg(test)]
 #[ctor::ctor]
@@ -1173,6 +1173,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
 
     let mut now = Instant::now();
     let mut mutation_count = operations;
+    let mut next_diagnostic_id = 0;
     let mut active_selections = BTreeMap::default();
     loop {
         let replica_index = rng.gen_range(0..replica_ids.len());
@@ -1213,7 +1214,27 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 });
                 mutation_count -= 1;
             }
-            40..=49 if replica_ids.len() < max_peers => {
+            40..=49 if mutation_count != 0 && replica_id == 0 => {
+                let entry_count = rng.gen_range(1..=5);
+                buffer.update(cx, |buffer, cx| {
+                    let diagnostics = (0..entry_count)
+                        .map(|_| {
+                            let range = buffer.random_byte_range(0, &mut rng);
+                            DiagnosticEntry {
+                                range,
+                                diagnostic: Diagnostic {
+                                    message: post_inc(&mut next_diagnostic_id).to_string(),
+                                    ..Default::default()
+                                },
+                            }
+                        })
+                        .collect();
+                    log::info!("peer {} setting diagnostics: {:?}", replica_id, diagnostics);
+                    buffer.update_diagnostics(diagnostics, None, cx).unwrap();
+                });
+                mutation_count -= 1;
+            }
+            50..=59 if replica_ids.len() < max_peers => {
                 let old_buffer = buffer.read(cx).to_proto();
                 let new_replica_id = replica_ids.len() as ReplicaId;
                 log::info!(
@@ -1230,14 +1251,14 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 replica_ids.push(new_replica_id);
                 network.replicate(replica_id, new_replica_id);
             }
-            50..=69 if mutation_count != 0 => {
+            60..=69 if mutation_count != 0 => {
                 buffer.update(cx, |buffer, cx| {
                     buffer.randomly_undo_redo(&mut rng, cx);
                     log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text());
                 });
                 mutation_count -= 1;
             }
-            70..=99 if network.has_unreceived(replica_id) => {
+            _ if network.has_unreceived(replica_id) => {
                 let ops = network
                     .receive(replica_id)
                     .into_iter()
@@ -1274,15 +1295,25 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
         }
     }
 
-    let first_buffer = buffers[0].read(cx);
+    let first_buffer = buffers[0].read(cx).snapshot();
     for buffer in &buffers[1..] {
-        let buffer = buffer.read(cx);
+        let buffer = buffer.read(cx).snapshot();
         assert_eq!(
             buffer.text(),
             first_buffer.text(),
             "Replica {} text != Replica 0 text",
             buffer.replica_id()
         );
+        assert_eq!(
+            buffer
+                .diagnostics_in_range::<_, usize>(0..buffer.len())
+                .collect::<Vec<_>>(),
+            first_buffer
+                .diagnostics_in_range::<_, usize>(0..first_buffer.len())
+                .collect::<Vec<_>>(),
+            "Replica {} diagnostics != Replica 0 diagnostics",
+            buffer.replica_id()
+        );
     }
 
     for buffer in &buffers {

crates/rpc/proto/zed.proto 🔗

@@ -542,7 +542,8 @@ message BufferState {
     repeated Operation operations = 4;
     repeated SelectionSet selections = 5;
     repeated Diagnostic diagnostics = 6;
-    repeated string completion_triggers = 7;
+    uint32 diagnostics_timestamp = 7;
+    repeated string completion_triggers = 8;
 }
 
 message BufferFragment {

crates/util/src/test.rs 🔗

@@ -5,7 +5,6 @@ use tempdir::TempDir;
 #[derive(Clone)]
 struct Envelope<T: Clone> {
     message: T,
-    sender: ReplicaId,
 }
 
 pub struct Network<T: Clone, R: rand::Rng> {
@@ -48,7 +47,6 @@ impl<T: Clone, R: rand::Rng> Network<T, R> {
                             insertion_index,
                             Envelope {
                                 message: message.clone(),
-                                sender,
                             },
                         );
                     }