Start getting diagnostics sets to work with multiple servers

Julia and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/language/src/buffer.rs       | 74 +++++++++++++++++++++++++-----
crates/language/src/buffer_tests.rs |  2 
crates/language/src/proto.rs        | 11 +++
crates/project/src/project.rs       | 10 ++-
crates/rpc/proto/zed.proto          |  3 
5 files changed, 79 insertions(+), 21 deletions(-)

Detailed changes

crates/language/src/buffer.rs 🔗

@@ -13,6 +13,7 @@ use crate::{
 };
 use anyhow::{anyhow, Result};
 use clock::ReplicaId;
+use collections::HashMap;
 use fs::LineEnding;
 use futures::FutureExt as _;
 use gpui::{fonts::HighlightStyle, AppContext, Entity, ModelContext, Task};
@@ -71,7 +72,7 @@ pub struct Buffer {
     syntax_map: Mutex<SyntaxMap>,
     parsing_in_background: bool,
     parse_count: usize,
-    diagnostics: DiagnosticSet,
+    diagnostics: HashMap<usize, DiagnosticSet>, // server_id -> diagnostic set
     remote_selections: TreeMap<ReplicaId, SelectionSet>,
     selections_update_count: usize,
     diagnostics_update_count: usize,
@@ -88,7 +89,7 @@ pub struct BufferSnapshot {
     pub git_diff: git::diff::BufferDiff,
     pub(crate) syntax: SyntaxSnapshot,
     file: Option<Arc<dyn File>>,
-    diagnostics: DiagnosticSet,
+    diagnostics: HashMap<usize, DiagnosticSet>, // server_id -> diagnostic set
     diagnostics_update_count: usize,
     file_update_count: usize,
     git_diff_update_count: usize,
@@ -164,16 +165,20 @@ pub struct CodeAction {
 #[derive(Clone, Debug, PartialEq, Eq)]
 pub enum Operation {
     Buffer(text::Operation),
+
     UpdateDiagnostics {
+        server_id: usize,
         diagnostics: Arc<[DiagnosticEntry<Anchor>]>,
         lamport_timestamp: clock::Lamport,
     },
+
     UpdateSelections {
         selections: Arc<[Selection<Anchor>]>,
         lamport_timestamp: clock::Lamport,
         line_mode: bool,
         cursor_shape: CursorShape,
     },
+
     UpdateCompletionTriggers {
         triggers: Vec<String>,
         lamport_timestamp: clock::Lamport,
@@ -409,6 +414,7 @@ impl Buffer {
     ) -> Task<Vec<proto::Operation>> {
         let mut operations = Vec::new();
         operations.extend(self.deferred_ops.iter().map(proto::serialize_operation));
+
         operations.extend(self.remote_selections.iter().map(|(_, set)| {
             proto::serialize_operation(&Operation::UpdateSelections {
                 selections: set.selections.clone(),
@@ -417,10 +423,15 @@ impl Buffer {
                 cursor_shape: set.cursor_shape,
             })
         }));
-        operations.push(proto::serialize_operation(&Operation::UpdateDiagnostics {
-            diagnostics: self.diagnostics.iter().cloned().collect(),
-            lamport_timestamp: self.diagnostics_timestamp,
-        }));
+
+        for (server_id, diagnostics) in &self.diagnostics {
+            operations.push(proto::serialize_operation(&Operation::UpdateDiagnostics {
+                lamport_timestamp: self.diagnostics_timestamp,
+                server_id: *server_id,
+                diagnostics: diagnostics.iter().cloned().collect(),
+            }));
+        }
+
         operations.push(proto::serialize_operation(
             &Operation::UpdateCompletionTriggers {
                 triggers: self.completion_triggers.clone(),
@@ -866,13 +877,19 @@ impl Buffer {
         cx.notify();
     }
 
-    pub fn update_diagnostics(&mut self, diagnostics: DiagnosticSet, cx: &mut ModelContext<Self>) {
+    pub fn update_diagnostics(
+        &mut self,
+        server_id: usize,
+        diagnostics: DiagnosticSet,
+        cx: &mut ModelContext<Self>,
+    ) {
         let lamport_timestamp = self.text.lamport_clock.tick();
         let op = Operation::UpdateDiagnostics {
+            server_id,
             diagnostics: diagnostics.iter().cloned().collect(),
             lamport_timestamp,
         };
-        self.apply_diagnostic_update(diagnostics, lamport_timestamp, cx);
+        self.apply_diagnostic_update(server_id, diagnostics, lamport_timestamp, cx);
         self.send_operation(op, cx);
     }
 
@@ -1580,11 +1597,13 @@ impl Buffer {
                 unreachable!("buffer operations should never be applied at this layer")
             }
             Operation::UpdateDiagnostics {
+                server_id,
                 diagnostics: diagnostic_set,
                 lamport_timestamp,
             } => {
                 let snapshot = self.snapshot();
                 self.apply_diagnostic_update(
+                    server_id,
                     DiagnosticSet::from_sorted_entries(diagnostic_set.iter().cloned(), &snapshot),
                     lamport_timestamp,
                     cx,
@@ -1626,12 +1645,13 @@ impl Buffer {
 
     fn apply_diagnostic_update(
         &mut self,
+        server_id: usize,
         diagnostics: DiagnosticSet,
         lamport_timestamp: clock::Lamport,
         cx: &mut ModelContext<Self>,
     ) {
         if lamport_timestamp > self.diagnostics_timestamp {
-            self.diagnostics = diagnostics;
+            self.diagnostics.insert(server_id, diagnostics);
             self.diagnostics_timestamp = lamport_timestamp;
             self.diagnostics_update_count += 1;
             self.text.lamport_clock.observe(lamport_timestamp);
@@ -2505,14 +2525,40 @@ impl BufferSnapshot {
     ) -> impl 'a + Iterator<Item = DiagnosticEntry<O>>
     where
         T: 'a + Clone + ToOffset,
-        O: 'a + FromAnchor,
+        O: 'a + FromAnchor + Ord,
     {
-        self.diagnostics.range(search_range, self, true, reversed)
+        let mut iterators: Vec<_> = self
+            .diagnostics
+            .values()
+            .map(|collection| {
+                collection
+                    .range::<T, O>(search_range.clone(), self, true, reversed)
+                    .peekable()
+            })
+            .collect();
+
+        std::iter::from_fn(move || {
+            let (next_ix, _) = iterators
+                .iter_mut()
+                .enumerate()
+                .flat_map(|(ix, iter)| Some((ix, iter.peek()?)))
+                .min_by(|(_, a), (_, b)| a.range.start.cmp(&b.range.start))?;
+            iterators[next_ix].next()
+        })
     }
 
     pub fn diagnostic_groups(&self) -> Vec<DiagnosticGroup<Anchor>> {
         let mut groups = Vec::new();
-        self.diagnostics.groups(&mut groups, self);
+        for diagnostics in self.diagnostics.values() {
+            diagnostics.groups(&mut groups, self);
+        }
+
+        groups.sort_by(|a, b| {
+            let a_start = &a.entries[a.primary_ix].range.start;
+            let b_start = &b.entries[b.primary_ix].range.start;
+            a_start.cmp(b_start, self)
+        });
+
         groups
     }
 
@@ -2523,7 +2569,9 @@ impl BufferSnapshot {
     where
         O: 'a + FromAnchor,
     {
-        self.diagnostics.group(group_id, self)
+        self.diagnostics
+            .values()
+            .flat_map(move |set| set.group(group_id, self))
     }
 
     pub fn diagnostics_update_count(&self) -> usize {

crates/language/src/buffer_tests.rs 🔗

@@ -1866,7 +1866,7 @@ fn test_random_collaboration(cx: &mut AppContext, mut rng: StdRng) {
                         buffer,
                     );
                     log::info!("peer {} setting diagnostics: {:?}", replica_id, diagnostics);
-                    buffer.update_diagnostics(diagnostics, cx);
+                    buffer.update_diagnostics(0, diagnostics, cx);
                 });
                 mutation_count -= 1;
             }

crates/language/src/proto.rs 🔗

@@ -40,6 +40,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
             crate::Operation::Buffer(text::Operation::Edit(edit)) => {
                 proto::operation::Variant::Edit(serialize_edit_operation(edit))
             }
+
             crate::Operation::Buffer(text::Operation::Undo {
                 undo,
                 lamport_timestamp,
@@ -58,6 +59,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
                     })
                     .collect(),
             }),
+
             crate::Operation::UpdateSelections {
                 selections,
                 line_mode,
@@ -70,14 +72,18 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
                 line_mode: *line_mode,
                 cursor_shape: serialize_cursor_shape(cursor_shape) as i32,
             }),
+
             crate::Operation::UpdateDiagnostics {
-                diagnostics,
                 lamport_timestamp,
+                server_id,
+                diagnostics,
             } => proto::operation::Variant::UpdateDiagnostics(proto::UpdateDiagnostics {
                 replica_id: lamport_timestamp.replica_id as u32,
                 lamport_timestamp: lamport_timestamp.value,
+                server_id: *server_id as u64,
                 diagnostics: serialize_diagnostics(diagnostics.iter()),
             }),
+
             crate::Operation::UpdateCompletionTriggers {
                 triggers,
                 lamport_timestamp,
@@ -267,11 +273,12 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
             }
             proto::operation::Variant::UpdateDiagnostics(message) => {
                 crate::Operation::UpdateDiagnostics {
-                    diagnostics: deserialize_diagnostics(message.diagnostics),
                     lamport_timestamp: clock::Lamport {
                         replica_id: message.replica_id as ReplicaId,
                         value: message.lamport_timestamp,
                     },
+                    server_id: message.server_id as usize,
+                    diagnostics: deserialize_diagnostics(message.diagnostics),
                 }
             }
             proto::operation::Variant::UpdateCompletionTriggers(message) => {

crates/project/src/project.rs 🔗

@@ -1675,7 +1675,7 @@ impl Project {
 
             if let Some(local_worktree) = file.worktree.read(cx).as_local() {
                 for (server_id, diagnostics) in local_worktree.diagnostics_for_path(file.path()) {
-                    self.update_buffer_diagnostics(buffer_handle, diagnostics, server_id, None, cx)
+                    self.update_buffer_diagnostics(buffer_handle, server_id, None, diagnostics, cx)
                         .log_err();
                 }
             }
@@ -2968,7 +2968,7 @@ impl Project {
         };
 
         if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
-            self.update_buffer_diagnostics(&buffer, diagnostics.clone(), server_id, version, cx)?;
+            self.update_buffer_diagnostics(&buffer, server_id, version, diagnostics.clone(), cx)?;
         }
 
         let updated = worktree.update(cx, |worktree, cx| {
@@ -2989,9 +2989,9 @@ impl Project {
     fn update_buffer_diagnostics(
         &mut self,
         buffer: &ModelHandle<Buffer>,
-        mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
         server_id: usize,
         version: Option<i32>,
+        mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
@@ -3053,7 +3053,9 @@ impl Project {
         drop(edits_since_save);
 
         let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
-        buffer.update(cx, |buffer, cx| buffer.update_diagnostics(set, cx));
+        buffer.update(cx, |buffer, cx| {
+            buffer.update_diagnostics(server_id, set, cx)
+        });
         Ok(())
     }
 

crates/rpc/proto/zed.proto 🔗

@@ -861,7 +861,8 @@ message IncomingContactRequest {
 message UpdateDiagnostics {
     uint32 replica_id = 1;
     uint32 lamport_timestamp = 2;
-    repeated Diagnostic diagnostics = 3;
+    uint64 server_id = 3;
+    repeated Diagnostic diagnostics = 4;
 }
 
 message Follow {