Batch diagnostics updates (#35794)

Kirill Bulatov created

Diagnostics updates were programmed in Zed based off the r-a LSP push
diagnostics, with all related updates happening per file.

https://github.com/zed-industries/zed/pull/19230 and especially
https://github.com/zed-industries/zed/pull/32269 brought in pull
diagnostics that could produce results for thousands files
simultaneously.

It was noted and addressed on the local side in
https://github.com/zed-industries/zed/pull/34022 but the remote side was
still not adjusted properly.

This PR 

* removes redundant diagnostics pull updates on remote clients, as
buffer diagnostics are updated via buffer sync operations separately
* batches all diagnostics-related updates and proto messages, so
multiple diagnostic summaries (per file) could be sent at once,
specifically, 1 (potentially large) diagnostics summary update instead
of N*10^3 small ones.

Buffer updates are still sent per buffer and not updated, as happening
separately and not offending the collab traffic that much.

Release Notes:

- Improved diagnostics performance in the collaborative mode

Change summary

crates/collab/src/rpc.rs                   |  36 
crates/diagnostics/src/diagnostics.rs      |   8 
crates/project/src/lsp_store.rs            | 729 ++++++++++++++---------
crates/project/src/lsp_store/clangd_ext.rs |  18 
crates/project/src/project.rs              |  50 -
crates/project/src/project_tests.rs        |   8 
crates/proto/proto/lsp.proto               |   1 
7 files changed, 500 insertions(+), 350 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -1630,15 +1630,15 @@ fn notify_rejoined_projects(
             }
 
             // Stream this worktree's diagnostics.
-            for summary in worktree.diagnostic_summaries {
-                session.peer.send(
-                    session.connection_id,
-                    proto::UpdateDiagnosticSummary {
-                        project_id: project.id.to_proto(),
-                        worktree_id: worktree.id,
-                        summary: Some(summary),
-                    },
-                )?;
+            let mut worktree_diagnostics = worktree.diagnostic_summaries.into_iter();
+            if let Some(summary) = worktree_diagnostics.next() {
+                let message = proto::UpdateDiagnosticSummary {
+                    project_id: project.id.to_proto(),
+                    worktree_id: worktree.id,
+                    summary: Some(summary),
+                    more_summaries: worktree_diagnostics.collect(),
+                };
+                session.peer.send(session.connection_id, message)?;
             }
 
             for settings_file in worktree.settings_files {
@@ -2060,15 +2060,15 @@ async fn join_project(
         }
 
         // Stream this worktree's diagnostics.
-        for summary in worktree.diagnostic_summaries {
-            session.peer.send(
-                session.connection_id,
-                proto::UpdateDiagnosticSummary {
-                    project_id: project_id.to_proto(),
-                    worktree_id: worktree.id,
-                    summary: Some(summary),
-                },
-            )?;
+        let mut worktree_diagnostics = worktree.diagnostic_summaries.into_iter();
+        if let Some(summary) = worktree_diagnostics.next() {
+            let message = proto::UpdateDiagnosticSummary {
+                project_id: project.id.to_proto(),
+                worktree_id: worktree.id,
+                summary: Some(summary),
+                more_summaries: worktree_diagnostics.collect(),
+            };
+            session.peer.send(session.connection_id, message)?;
         }
 
         for settings_file in worktree.settings_files {

crates/diagnostics/src/diagnostics.rs 🔗

@@ -177,9 +177,9 @@ impl ProjectDiagnosticsEditor {
                 }
                 project::Event::DiagnosticsUpdated {
                     language_server_id,
-                    path,
+                    paths,
                 } => {
-                    this.paths_to_update.insert(path.clone());
+                    this.paths_to_update.extend(paths.clone());
                     let project = project.clone();
                     this.diagnostic_summary_update = cx.spawn(async move |this, cx| {
                         cx.background_executor()
@@ -193,9 +193,9 @@ impl ProjectDiagnosticsEditor {
                     cx.emit(EditorEvent::TitleChanged);
 
                     if this.editor.focus_handle(cx).contains_focused(window, cx) || this.focus_handle.contains_focused(window, cx) {
-                        log::debug!("diagnostics updated for server {language_server_id}, path {path:?}. recording change");
+                        log::debug!("diagnostics updated for server {language_server_id}, paths {paths:?}. recording change");
                     } else {
-                        log::debug!("diagnostics updated for server {language_server_id}, path {path:?}. updating excerpts");
+                        log::debug!("diagnostics updated for server {language_server_id}, paths {paths:?}. updating excerpts");
                         this.update_stale_excerpts(window, cx);
                     }
                 }

crates/project/src/lsp_store.rs 🔗

@@ -140,6 +140,20 @@ impl FormatTrigger {
     }
 }
 
+#[derive(Debug)]
+pub struct DocumentDiagnosticsUpdate<'a, D> {
+    pub diagnostics: D,
+    pub result_id: Option<String>,
+    pub server_id: LanguageServerId,
+    pub disk_based_sources: Cow<'a, [String]>,
+}
+
+pub struct DocumentDiagnostics {
+    diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
+    document_abs_path: PathBuf,
+    version: Option<i32>,
+}
+
 pub struct LocalLspStore {
     weak: WeakEntity<LspStore>,
     worktree_store: Entity<WorktreeStore>,
@@ -503,12 +517,16 @@ impl LocalLspStore {
                                 adapter.process_diagnostics(&mut params, server_id, buffer);
                             }
 
-                            this.merge_diagnostics(
-                                server_id,
-                                params,
-                                None,
+                            this.merge_lsp_diagnostics(
                                 DiagnosticSourceKind::Pushed,
-                                &adapter.disk_based_diagnostic_sources,
+                                vec![DocumentDiagnosticsUpdate {
+                                    server_id,
+                                    diagnostics: params,
+                                    result_id: None,
+                                    disk_based_sources: Cow::Borrowed(
+                                        &adapter.disk_based_diagnostic_sources,
+                                    ),
+                                }],
                                 |_, diagnostic, cx| match diagnostic.source_kind {
                                     DiagnosticSourceKind::Other | DiagnosticSourceKind::Pushed => {
                                         adapter.retain_old_diagnostic(diagnostic, cx)
@@ -3610,8 +3628,8 @@ pub enum LspStoreEvent {
     RefreshInlayHints,
     RefreshCodeLens,
     DiagnosticsUpdated {
-        language_server_id: LanguageServerId,
-        path: ProjectPath,
+        server_id: LanguageServerId,
+        paths: Vec<ProjectPath>,
     },
     DiskBasedDiagnosticsStarted {
         language_server_id: LanguageServerId,
@@ -4440,17 +4458,24 @@ impl LspStore {
 
     pub(crate) fn send_diagnostic_summaries(&self, worktree: &mut Worktree) {
         if let Some((client, downstream_project_id)) = self.downstream_client.clone() {
-            if let Some(summaries) = self.diagnostic_summaries.get(&worktree.id()) {
-                for (path, summaries) in summaries {
-                    for (&server_id, summary) in summaries {
-                        client
-                            .send(proto::UpdateDiagnosticSummary {
-                                project_id: downstream_project_id,
-                                worktree_id: worktree.id().to_proto(),
-                                summary: Some(summary.to_proto(server_id, path)),
-                            })
-                            .log_err();
-                    }
+            if let Some(diangostic_summaries) = self.diagnostic_summaries.get(&worktree.id()) {
+                let mut summaries =
+                    diangostic_summaries
+                        .into_iter()
+                        .flat_map(|(path, summaries)| {
+                            summaries
+                                .into_iter()
+                                .map(|(server_id, summary)| summary.to_proto(*server_id, path))
+                        });
+                if let Some(summary) = summaries.next() {
+                    client
+                        .send(proto::UpdateDiagnosticSummary {
+                            project_id: downstream_project_id,
+                            worktree_id: worktree.id().to_proto(),
+                            summary: Some(summary),
+                            more_summaries: summaries.collect(),
+                        })
+                        .log_err();
                 }
             }
         }
@@ -6564,7 +6589,7 @@ impl LspStore {
         &mut self,
         buffer: Entity<Buffer>,
         cx: &mut Context<Self>,
-    ) -> Task<Result<Vec<LspPullDiagnostics>>> {
+    ) -> Task<Result<Option<Vec<LspPullDiagnostics>>>> {
         let buffer_id = buffer.read(cx).remote_id();
 
         if let Some((client, upstream_project_id)) = self.upstream_client() {
@@ -6575,7 +6600,7 @@ impl LspStore {
                 },
                 cx,
             ) {
-                return Task::ready(Ok(Vec::new()));
+                return Task::ready(Ok(None));
             }
             let request_task = client.request(proto::MultiLspQuery {
                 buffer_id: buffer_id.to_proto(),
@@ -6593,7 +6618,7 @@ impl LspStore {
                 )),
             });
             cx.background_spawn(async move {
-                Ok(request_task
+                let _proto_responses = request_task
                     .await?
                     .responses
                     .into_iter()
@@ -6606,8 +6631,11 @@ impl LspStore {
                             None
                         }
                     })
-                    .flat_map(GetDocumentDiagnostics::diagnostics_from_proto)
-                    .collect())
+                    .collect::<Vec<_>>();
+                // Proto requests cause the diagnostics to be pulled from language server(s) on the local side
+                // and then, buffer state updated with the diagnostics received, which will be later propagated to the client.
+                // Do not attempt to further process the dummy responses here.
+                Ok(None)
             })
         } else {
             let server_ids = buffer.update(cx, |buffer, cx| {
@@ -6635,7 +6663,7 @@ impl LspStore {
                 for diagnostics in join_all(pull_diagnostics).await {
                     responses.extend(diagnostics?);
                 }
-                Ok(responses)
+                Ok(Some(responses))
             })
         }
     }
@@ -6701,75 +6729,93 @@ impl LspStore {
         buffer: Entity<Buffer>,
         cx: &mut Context<Self>,
     ) -> Task<anyhow::Result<()>> {
-        let buffer_id = buffer.read(cx).remote_id();
         let diagnostics = self.pull_diagnostics(buffer, cx);
         cx.spawn(async move |lsp_store, cx| {
-            let diagnostics = diagnostics.await.context("pulling diagnostics")?;
+            let Some(diagnostics) = diagnostics.await.context("pulling diagnostics")? else {
+                return Ok(());
+            };
             lsp_store.update(cx, |lsp_store, cx| {
                 if lsp_store.as_local().is_none() {
                     return;
                 }
 
-                for diagnostics_set in diagnostics {
-                    let LspPullDiagnostics::Response {
-                        server_id,
-                        uri,
-                        diagnostics,
-                    } = diagnostics_set
-                    else {
-                        continue;
-                    };
-
-                    let adapter = lsp_store.language_server_adapter_for_id(server_id);
-                    let disk_based_sources = adapter
-                        .as_ref()
-                        .map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
-                        .unwrap_or(&[]);
-                    match diagnostics {
-                        PulledDiagnostics::Unchanged { result_id } => {
-                            lsp_store
-                                .merge_diagnostics(
-                                    server_id,
-                                    lsp::PublishDiagnosticsParams {
-                                        uri: uri.clone(),
-                                        diagnostics: Vec::new(),
-                                        version: None,
-                                    },
-                                    Some(result_id),
-                                    DiagnosticSourceKind::Pulled,
-                                    disk_based_sources,
-                                    |_, _, _| true,
-                                    cx,
-                                )
-                                .log_err();
-                        }
-                        PulledDiagnostics::Changed {
+                let mut unchanged_buffers = HashSet::default();
+                let mut changed_buffers = HashSet::default();
+                let server_diagnostics_updates = diagnostics
+                    .into_iter()
+                    .filter_map(|diagnostics_set| match diagnostics_set {
+                        LspPullDiagnostics::Response {
+                            server_id,
+                            uri,
                             diagnostics,
-                            result_id,
-                        } => {
-                            lsp_store
-                                .merge_diagnostics(
+                        } => Some((server_id, uri, diagnostics)),
+                        LspPullDiagnostics::Default => None,
+                    })
+                    .fold(
+                        HashMap::default(),
+                        |mut acc, (server_id, uri, diagnostics)| {
+                            let (result_id, diagnostics) = match diagnostics {
+                                PulledDiagnostics::Unchanged { result_id } => {
+                                    unchanged_buffers.insert(uri.clone());
+                                    (Some(result_id), Vec::new())
+                                }
+                                PulledDiagnostics::Changed {
+                                    result_id,
+                                    diagnostics,
+                                } => {
+                                    changed_buffers.insert(uri.clone());
+                                    (result_id, diagnostics)
+                                }
+                            };
+                            let disk_based_sources = Cow::Owned(
+                                lsp_store
+                                    .language_server_adapter_for_id(server_id)
+                                    .as_ref()
+                                    .map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
+                                    .unwrap_or(&[])
+                                    .to_vec(),
+                            );
+                            acc.entry(server_id).or_insert_with(Vec::new).push(
+                                DocumentDiagnosticsUpdate {
                                     server_id,
-                                    lsp::PublishDiagnosticsParams {
-                                        uri: uri.clone(),
+                                    diagnostics: lsp::PublishDiagnosticsParams {
+                                        uri,
                                         diagnostics,
                                         version: None,
                                     },
                                     result_id,
-                                    DiagnosticSourceKind::Pulled,
                                     disk_based_sources,
-                                    |buffer, old_diagnostic, _| match old_diagnostic.source_kind {
-                                        DiagnosticSourceKind::Pulled => {
-                                            buffer.remote_id() != buffer_id
-                                        }
-                                        DiagnosticSourceKind::Other
-                                        | DiagnosticSourceKind::Pushed => true,
-                                    },
-                                    cx,
-                                )
-                                .log_err();
-                        }
-                    }
+                                },
+                            );
+                            acc
+                        },
+                    );
+
+                for diagnostic_updates in server_diagnostics_updates.into_values() {
+                    lsp_store
+                        .merge_lsp_diagnostics(
+                            DiagnosticSourceKind::Pulled,
+                            diagnostic_updates,
+                            |buffer, old_diagnostic, cx| {
+                                File::from_dyn(buffer.file())
+                                    .and_then(|file| {
+                                        let abs_path = file.as_local()?.abs_path(cx);
+                                        lsp::Url::from_file_path(abs_path).ok()
+                                    })
+                                    .is_none_or(|buffer_uri| {
+                                        unchanged_buffers.contains(&buffer_uri)
+                                            || match old_diagnostic.source_kind {
+                                                DiagnosticSourceKind::Pulled => {
+                                                    !changed_buffers.contains(&buffer_uri)
+                                                }
+                                                DiagnosticSourceKind::Other
+                                                | DiagnosticSourceKind::Pushed => true,
+                                            }
+                                    })
+                            },
+                            cx,
+                        )
+                        .log_err();
                 }
             })
         })
@@ -7791,88 +7837,135 @@ impl LspStore {
         cx: &mut Context<Self>,
     ) -> anyhow::Result<()> {
         self.merge_diagnostic_entries(
-            server_id,
-            abs_path,
-            result_id,
-            version,
-            diagnostics,
+            vec![DocumentDiagnosticsUpdate {
+                diagnostics: DocumentDiagnostics {
+                    diagnostics,
+                    document_abs_path: abs_path,
+                    version,
+                },
+                result_id,
+                server_id,
+                disk_based_sources: Cow::Borrowed(&[]),
+            }],
             |_, _, _| false,
             cx,
         )?;
         Ok(())
     }
 
-    pub fn merge_diagnostic_entries(
+    pub fn merge_diagnostic_entries<'a>(
         &mut self,
-        server_id: LanguageServerId,
-        abs_path: PathBuf,
-        result_id: Option<String>,
-        version: Option<i32>,
-        mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
-        filter: impl Fn(&Buffer, &Diagnostic, &App) -> bool + Clone,
+        diagnostic_updates: Vec<DocumentDiagnosticsUpdate<'a, DocumentDiagnostics>>,
+        merge: impl Fn(&Buffer, &Diagnostic, &App) -> bool + Clone,
         cx: &mut Context<Self>,
     ) -> anyhow::Result<()> {
-        let Some((worktree, relative_path)) =
-            self.worktree_store.read(cx).find_worktree(&abs_path, cx)
-        else {
-            log::warn!("skipping diagnostics update, no worktree found for path {abs_path:?}");
-            return Ok(());
-        };
+        let mut diagnostics_summary = None::<proto::UpdateDiagnosticSummary>;
+        let mut updated_diagnostics_paths = HashMap::default();
+        for mut update in diagnostic_updates {
+            let abs_path = &update.diagnostics.document_abs_path;
+            let server_id = update.server_id;
+            let Some((worktree, relative_path)) =
+                self.worktree_store.read(cx).find_worktree(abs_path, cx)
+            else {
+                log::warn!("skipping diagnostics update, no worktree found for path {abs_path:?}");
+                return Ok(());
+            };
 
-        let project_path = ProjectPath {
-            worktree_id: worktree.read(cx).id(),
-            path: relative_path.into(),
-        };
+            let worktree_id = worktree.read(cx).id();
+            let project_path = ProjectPath {
+                worktree_id,
+                path: relative_path.into(),
+            };
 
-        if let Some(buffer_handle) = self.buffer_store.read(cx).get_by_path(&project_path) {
-            let snapshot = buffer_handle.read(cx).snapshot();
-            let buffer = buffer_handle.read(cx);
-            let reused_diagnostics = buffer
-                .get_diagnostics(server_id)
-                .into_iter()
-                .flat_map(|diag| {
-                    diag.iter()
-                        .filter(|v| filter(buffer, &v.diagnostic, cx))
-                        .map(|v| {
-                            let start = Unclipped(v.range.start.to_point_utf16(&snapshot));
-                            let end = Unclipped(v.range.end.to_point_utf16(&snapshot));
-                            DiagnosticEntry {
-                                range: start..end,
-                                diagnostic: v.diagnostic.clone(),
-                            }
-                        })
-                })
-                .collect::<Vec<_>>();
+            if let Some(buffer_handle) = self.buffer_store.read(cx).get_by_path(&project_path) {
+                let snapshot = buffer_handle.read(cx).snapshot();
+                let buffer = buffer_handle.read(cx);
+                let reused_diagnostics = buffer
+                    .get_diagnostics(server_id)
+                    .into_iter()
+                    .flat_map(|diag| {
+                        diag.iter()
+                            .filter(|v| merge(buffer, &v.diagnostic, cx))
+                            .map(|v| {
+                                let start = Unclipped(v.range.start.to_point_utf16(&snapshot));
+                                let end = Unclipped(v.range.end.to_point_utf16(&snapshot));
+                                DiagnosticEntry {
+                                    range: start..end,
+                                    diagnostic: v.diagnostic.clone(),
+                                }
+                            })
+                    })
+                    .collect::<Vec<_>>();
 
-            self.as_local_mut()
-                .context("cannot merge diagnostics on a remote LspStore")?
-                .update_buffer_diagnostics(
-                    &buffer_handle,
+                self.as_local_mut()
+                    .context("cannot merge diagnostics on a remote LspStore")?
+                    .update_buffer_diagnostics(
+                        &buffer_handle,
+                        server_id,
+                        update.result_id,
+                        update.diagnostics.version,
+                        update.diagnostics.diagnostics.clone(),
+                        reused_diagnostics.clone(),
+                        cx,
+                    )?;
+
+                update.diagnostics.diagnostics.extend(reused_diagnostics);
+            }
+
+            let updated = worktree.update(cx, |worktree, cx| {
+                self.update_worktree_diagnostics(
+                    worktree.id(),
                     server_id,
-                    result_id,
-                    version,
-                    diagnostics.clone(),
-                    reused_diagnostics.clone(),
+                    project_path.path.clone(),
+                    update.diagnostics.diagnostics,
                     cx,
-                )?;
-
-            diagnostics.extend(reused_diagnostics);
+                )
+            })?;
+            match updated {
+                ControlFlow::Continue(new_summary) => {
+                    if let Some((project_id, new_summary)) = new_summary {
+                        match &mut diagnostics_summary {
+                            Some(diagnostics_summary) => {
+                                diagnostics_summary
+                                    .more_summaries
+                                    .push(proto::DiagnosticSummary {
+                                        path: project_path.path.as_ref().to_proto(),
+                                        language_server_id: server_id.0 as u64,
+                                        error_count: new_summary.error_count,
+                                        warning_count: new_summary.warning_count,
+                                    })
+                            }
+                            None => {
+                                diagnostics_summary = Some(proto::UpdateDiagnosticSummary {
+                                    project_id: project_id,
+                                    worktree_id: worktree_id.to_proto(),
+                                    summary: Some(proto::DiagnosticSummary {
+                                        path: project_path.path.as_ref().to_proto(),
+                                        language_server_id: server_id.0 as u64,
+                                        error_count: new_summary.error_count,
+                                        warning_count: new_summary.warning_count,
+                                    }),
+                                    more_summaries: Vec::new(),
+                                })
+                            }
+                        }
+                    }
+                    updated_diagnostics_paths
+                        .entry(server_id)
+                        .or_insert_with(Vec::new)
+                        .push(project_path);
+                }
+                ControlFlow::Break(()) => {}
+            }
         }
 
-        let updated = worktree.update(cx, |worktree, cx| {
-            self.update_worktree_diagnostics(
-                worktree.id(),
-                server_id,
-                project_path.path.clone(),
-                diagnostics,
-                cx,
-            )
-        })?;
-        if updated {
-            cx.emit(LspStoreEvent::DiagnosticsUpdated {
-                language_server_id: server_id,
-                path: project_path,
-            })
+        if let Some((diagnostics_summary, (downstream_client, _))) =
+            diagnostics_summary.zip(self.downstream_client.as_ref())
+        {
+            downstream_client.send(diagnostics_summary).log_err();
+        }
+        for (server_id, paths) in updated_diagnostics_paths {
+            cx.emit(LspStoreEvent::DiagnosticsUpdated { server_id, paths });
         }
         Ok(())
     }
@@ -7881,10 +7974,10 @@ impl LspStore {
         &mut self,
         worktree_id: WorktreeId,
         server_id: LanguageServerId,
-        worktree_path: Arc<Path>,
+        path_in_worktree: Arc<Path>,
         diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
         _: &mut Context<Worktree>,
-    ) -> Result<bool> {
+    ) -> Result<ControlFlow<(), Option<(u64, proto::DiagnosticSummary)>>> {
         let local = match &mut self.mode {
             LspStoreMode::Local(local_lsp_store) => local_lsp_store,
             _ => anyhow::bail!("update_worktree_diagnostics called on remote"),
@@ -7892,7 +7985,9 @@ impl LspStore {
 
         let summaries_for_tree = self.diagnostic_summaries.entry(worktree_id).or_default();
         let diagnostics_for_tree = local.diagnostics.entry(worktree_id).or_default();
-        let summaries_by_server_id = summaries_for_tree.entry(worktree_path.clone()).or_default();
+        let summaries_by_server_id = summaries_for_tree
+            .entry(path_in_worktree.clone())
+            .or_default();
 
         let old_summary = summaries_by_server_id
             .remove(&server_id)
@@ -7900,18 +7995,19 @@ impl LspStore {
 
         let new_summary = DiagnosticSummary::new(&diagnostics);
         if new_summary.is_empty() {
-            if let Some(diagnostics_by_server_id) = diagnostics_for_tree.get_mut(&worktree_path) {
+            if let Some(diagnostics_by_server_id) = diagnostics_for_tree.get_mut(&path_in_worktree)
+            {
                 if let Ok(ix) = diagnostics_by_server_id.binary_search_by_key(&server_id, |e| e.0) {
                     diagnostics_by_server_id.remove(ix);
                 }
                 if diagnostics_by_server_id.is_empty() {
-                    diagnostics_for_tree.remove(&worktree_path);
+                    diagnostics_for_tree.remove(&path_in_worktree);
                 }
             }
         } else {
             summaries_by_server_id.insert(server_id, new_summary);
             let diagnostics_by_server_id = diagnostics_for_tree
-                .entry(worktree_path.clone())
+                .entry(path_in_worktree.clone())
                 .or_default();
             match diagnostics_by_server_id.binary_search_by_key(&server_id, |e| e.0) {
                 Ok(ix) => {
@@ -7924,23 +8020,22 @@ impl LspStore {
         }
 
         if !old_summary.is_empty() || !new_summary.is_empty() {
-            if let Some((downstream_client, project_id)) = &self.downstream_client {
-                downstream_client
-                    .send(proto::UpdateDiagnosticSummary {
-                        project_id: *project_id,
-                        worktree_id: worktree_id.to_proto(),
-                        summary: Some(proto::DiagnosticSummary {
-                            path: worktree_path.to_proto(),
-                            language_server_id: server_id.0 as u64,
-                            error_count: new_summary.error_count as u32,
-                            warning_count: new_summary.warning_count as u32,
-                        }),
-                    })
-                    .log_err();
+            if let Some((_, project_id)) = &self.downstream_client {
+                Ok(ControlFlow::Continue(Some((
+                    *project_id,
+                    proto::DiagnosticSummary {
+                        path: path_in_worktree.to_proto(),
+                        language_server_id: server_id.0 as u64,
+                        error_count: new_summary.error_count as u32,
+                        warning_count: new_summary.warning_count as u32,
+                    },
+                ))))
+            } else {
+                Ok(ControlFlow::Continue(None))
             }
+        } else {
+            Ok(ControlFlow::Break(()))
         }
-
-        Ok(!old_summary.is_empty() || !new_summary.is_empty())
     }
 
     pub fn open_buffer_for_symbol(
@@ -8793,23 +8888,30 @@ impl LspStore {
         envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
         mut cx: AsyncApp,
     ) -> Result<()> {
-        this.update(&mut cx, |this, cx| {
+        this.update(&mut cx, |lsp_store, cx| {
             let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-            if let Some(message) = envelope.payload.summary {
+            let mut updated_diagnostics_paths = HashMap::default();
+            let mut diagnostics_summary = None::<proto::UpdateDiagnosticSummary>;
+            for message_summary in envelope
+                .payload
+                .summary
+                .into_iter()
+                .chain(envelope.payload.more_summaries)
+            {
                 let project_path = ProjectPath {
                     worktree_id,
-                    path: Arc::<Path>::from_proto(message.path),
+                    path: Arc::<Path>::from_proto(message_summary.path),
                 };
                 let path = project_path.path.clone();
-                let server_id = LanguageServerId(message.language_server_id as usize);
+                let server_id = LanguageServerId(message_summary.language_server_id as usize);
                 let summary = DiagnosticSummary {
-                    error_count: message.error_count as usize,
-                    warning_count: message.warning_count as usize,
+                    error_count: message_summary.error_count as usize,
+                    warning_count: message_summary.warning_count as usize,
                 };
 
                 if summary.is_empty() {
                     if let Some(worktree_summaries) =
-                        this.diagnostic_summaries.get_mut(&worktree_id)
+                        lsp_store.diagnostic_summaries.get_mut(&worktree_id)
                     {
                         if let Some(summaries) = worktree_summaries.get_mut(&path) {
                             summaries.remove(&server_id);
@@ -8819,31 +8921,55 @@ impl LspStore {
                         }
                     }
                 } else {
-                    this.diagnostic_summaries
+                    lsp_store
+                        .diagnostic_summaries
                         .entry(worktree_id)
                         .or_default()
                         .entry(path)
                         .or_default()
                         .insert(server_id, summary);
                 }
-                if let Some((downstream_client, project_id)) = &this.downstream_client {
-                    downstream_client
-                        .send(proto::UpdateDiagnosticSummary {
-                            project_id: *project_id,
-                            worktree_id: worktree_id.to_proto(),
-                            summary: Some(proto::DiagnosticSummary {
-                                path: project_path.path.as_ref().to_proto(),
-                                language_server_id: server_id.0 as u64,
-                                error_count: summary.error_count as u32,
-                                warning_count: summary.warning_count as u32,
-                            }),
-                        })
-                        .log_err();
+
+                if let Some((_, project_id)) = &lsp_store.downstream_client {
+                    match &mut diagnostics_summary {
+                        Some(diagnostics_summary) => {
+                            diagnostics_summary
+                                .more_summaries
+                                .push(proto::DiagnosticSummary {
+                                    path: project_path.path.as_ref().to_proto(),
+                                    language_server_id: server_id.0 as u64,
+                                    error_count: summary.error_count as u32,
+                                    warning_count: summary.warning_count as u32,
+                                })
+                        }
+                        None => {
+                            diagnostics_summary = Some(proto::UpdateDiagnosticSummary {
+                                project_id: *project_id,
+                                worktree_id: worktree_id.to_proto(),
+                                summary: Some(proto::DiagnosticSummary {
+                                    path: project_path.path.as_ref().to_proto(),
+                                    language_server_id: server_id.0 as u64,
+                                    error_count: summary.error_count as u32,
+                                    warning_count: summary.warning_count as u32,
+                                }),
+                                more_summaries: Vec::new(),
+                            })
+                        }
+                    }
                 }
-                cx.emit(LspStoreEvent::DiagnosticsUpdated {
-                    language_server_id: LanguageServerId(message.language_server_id as usize),
-                    path: project_path,
-                });
+                updated_diagnostics_paths
+                    .entry(server_id)
+                    .or_insert_with(Vec::new)
+                    .push(project_path);
+            }
+
+            if let Some((diagnostics_summary, (downstream_client, _))) =
+                diagnostics_summary.zip(lsp_store.downstream_client.as_ref())
+            {
+                downstream_client.send(diagnostics_summary).log_err();
+            }
+            for (server_id, paths) in updated_diagnostics_paths {
+                cx.emit(LspStoreEvent::DiagnosticsUpdated { server_id, paths });
             }
             Ok(())
         })?
@@ -10361,6 +10487,7 @@ impl LspStore {
                                     error_count: 0,
                                     warning_count: 0,
                                 }),
+                                more_summaries: Vec::new(),
                             })
                             .log_err();
                     }
@@ -10649,52 +10776,80 @@ impl LspStore {
         )
     }
 
+    #[cfg(any(test, feature = "test-support"))]
     pub fn update_diagnostics(
         &mut self,
-        language_server_id: LanguageServerId,
-        params: lsp::PublishDiagnosticsParams,
+        server_id: LanguageServerId,
+        diagnostics: lsp::PublishDiagnosticsParams,
         result_id: Option<String>,
         source_kind: DiagnosticSourceKind,
         disk_based_sources: &[String],
         cx: &mut Context<Self>,
     ) -> Result<()> {
-        self.merge_diagnostics(
-            language_server_id,
-            params,
-            result_id,
+        self.merge_lsp_diagnostics(
             source_kind,
-            disk_based_sources,
+            vec![DocumentDiagnosticsUpdate {
+                diagnostics,
+                result_id,
+                server_id,
+                disk_based_sources: Cow::Borrowed(disk_based_sources),
+            }],
             |_, _, _| false,
             cx,
         )
     }
 
-    pub fn merge_diagnostics(
+    pub fn merge_lsp_diagnostics(
         &mut self,
-        language_server_id: LanguageServerId,
-        mut params: lsp::PublishDiagnosticsParams,
-        result_id: Option<String>,
         source_kind: DiagnosticSourceKind,
-        disk_based_sources: &[String],
-        filter: impl Fn(&Buffer, &Diagnostic, &App) -> bool + Clone,
+        lsp_diagnostics: Vec<DocumentDiagnosticsUpdate<lsp::PublishDiagnosticsParams>>,
+        merge: impl Fn(&Buffer, &Diagnostic, &App) -> bool + Clone,
         cx: &mut Context<Self>,
     ) -> Result<()> {
         anyhow::ensure!(self.mode.is_local(), "called update_diagnostics on remote");
-        let abs_path = params
-            .uri
-            .to_file_path()
-            .map_err(|()| anyhow!("URI is not a file"))?;
+        let updates = lsp_diagnostics
+            .into_iter()
+            .filter_map(|update| {
+                let abs_path = update.diagnostics.uri.to_file_path().ok()?;
+                Some(DocumentDiagnosticsUpdate {
+                    diagnostics: self.lsp_to_document_diagnostics(
+                        abs_path,
+                        source_kind,
+                        update.server_id,
+                        update.diagnostics,
+                        &update.disk_based_sources,
+                    ),
+                    result_id: update.result_id,
+                    server_id: update.server_id,
+                    disk_based_sources: update.disk_based_sources,
+                })
+            })
+            .collect();
+        self.merge_diagnostic_entries(updates, merge, cx)?;
+        Ok(())
+    }
+
+    fn lsp_to_document_diagnostics(
+        &mut self,
+        document_abs_path: PathBuf,
+        source_kind: DiagnosticSourceKind,
+        server_id: LanguageServerId,
+        mut lsp_diagnostics: lsp::PublishDiagnosticsParams,
+        disk_based_sources: &[String],
+    ) -> DocumentDiagnostics {
         let mut diagnostics = Vec::default();
         let mut primary_diagnostic_group_ids = HashMap::default();
         let mut sources_by_group_id = HashMap::default();
         let mut supporting_diagnostics = HashMap::default();
 
-        let adapter = self.language_server_adapter_for_id(language_server_id);
+        let adapter = self.language_server_adapter_for_id(server_id);
 
         // Ensure that primary diagnostics are always the most severe
-        params.diagnostics.sort_by_key(|item| item.severity);
+        lsp_diagnostics
+            .diagnostics
+            .sort_by_key(|item| item.severity);
 
-        for diagnostic in &params.diagnostics {
+        for diagnostic in &lsp_diagnostics.diagnostics {
             let source = diagnostic.source.as_ref();
             let range = range_from_lsp(diagnostic.range);
             let is_supporting = diagnostic
@@ -10716,7 +10871,7 @@ impl LspStore {
                 .map_or(false, |tags| tags.contains(&DiagnosticTag::UNNECESSARY));
 
             let underline = self
-                .language_server_adapter_for_id(language_server_id)
+                .language_server_adapter_for_id(server_id)
                 .map_or(true, |adapter| adapter.underline_diagnostic(diagnostic));
 
             if is_supporting {
@@ -10758,7 +10913,7 @@ impl LspStore {
                 });
                 if let Some(infos) = &diagnostic.related_information {
                     for info in infos {
-                        if info.location.uri == params.uri && !info.message.is_empty() {
+                        if info.location.uri == lsp_diagnostics.uri && !info.message.is_empty() {
                             let range = range_from_lsp(info.location.range);
                             diagnostics.push(DiagnosticEntry {
                                 range,
@@ -10806,16 +10961,11 @@ impl LspStore {
             }
         }
 
-        self.merge_diagnostic_entries(
-            language_server_id,
-            abs_path,
-            result_id,
-            params.version,
+        DocumentDiagnostics {
             diagnostics,
-            filter,
-            cx,
-        )?;
-        Ok(())
+            document_abs_path,
+            version: lsp_diagnostics.version,
+        }
     }
 
     fn insert_newly_running_language_server(
@@ -11571,67 +11721,84 @@ impl LspStore {
     ) {
         let workspace_diagnostics =
             GetDocumentDiagnostics::deserialize_workspace_diagnostics_report(report, server_id);
-        for workspace_diagnostics in workspace_diagnostics {
-            let LspPullDiagnostics::Response {
-                server_id,
-                uri,
-                diagnostics,
-            } = workspace_diagnostics.diagnostics
-            else {
-                continue;
-            };
-
-            let adapter = self.language_server_adapter_for_id(server_id);
-            let disk_based_sources = adapter
-                .as_ref()
-                .map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
-                .unwrap_or(&[]);
-
-            match diagnostics {
-                PulledDiagnostics::Unchanged { result_id } => {
-                    self.merge_diagnostics(
-                        server_id,
-                        lsp::PublishDiagnosticsParams {
-                            uri: uri.clone(),
-                            diagnostics: Vec::new(),
-                            version: None,
-                        },
-                        Some(result_id),
-                        DiagnosticSourceKind::Pulled,
-                        disk_based_sources,
-                        |_, _, _| true,
-                        cx,
-                    )
-                    .log_err();
-                }
-                PulledDiagnostics::Changed {
-                    diagnostics,
-                    result_id,
-                } => {
-                    self.merge_diagnostics(
+        let mut unchanged_buffers = HashSet::default();
+        let mut changed_buffers = HashSet::default();
+        let workspace_diagnostics_updates = workspace_diagnostics
+            .into_iter()
+            .filter_map(
+                |workspace_diagnostics| match workspace_diagnostics.diagnostics {
+                    LspPullDiagnostics::Response {
                         server_id,
-                        lsp::PublishDiagnosticsParams {
-                            uri: uri.clone(),
+                        uri,
+                        diagnostics,
+                    } => Some((server_id, uri, diagnostics, workspace_diagnostics.version)),
+                    LspPullDiagnostics::Default => None,
+                },
+            )
+            .fold(
+                HashMap::default(),
+                |mut acc, (server_id, uri, diagnostics, version)| {
+                    let (result_id, diagnostics) = match diagnostics {
+                        PulledDiagnostics::Unchanged { result_id } => {
+                            unchanged_buffers.insert(uri.clone());
+                            (Some(result_id), Vec::new())
+                        }
+                        PulledDiagnostics::Changed {
+                            result_id,
                             diagnostics,
-                            version: workspace_diagnostics.version,
-                        },
-                        result_id,
-                        DiagnosticSourceKind::Pulled,
-                        disk_based_sources,
-                        |buffer, old_diagnostic, cx| match old_diagnostic.source_kind {
-                            DiagnosticSourceKind::Pulled => {
-                                let buffer_url = File::from_dyn(buffer.file())
-                                    .map(|f| f.abs_path(cx))
-                                    .and_then(|abs_path| file_path_to_lsp_url(&abs_path).ok());
-                                buffer_url.is_none_or(|buffer_url| buffer_url != uri)
-                            }
-                            DiagnosticSourceKind::Other | DiagnosticSourceKind::Pushed => true,
-                        },
-                        cx,
-                    )
-                    .log_err();
-                }
-            }
+                        } => {
+                            changed_buffers.insert(uri.clone());
+                            (result_id, diagnostics)
+                        }
+                    };
+                    let disk_based_sources = Cow::Owned(
+                        self.language_server_adapter_for_id(server_id)
+                            .as_ref()
+                            .map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
+                            .unwrap_or(&[])
+                            .to_vec(),
+                    );
+                    acc.entry(server_id)
+                        .or_insert_with(Vec::new)
+                        .push(DocumentDiagnosticsUpdate {
+                            server_id,
+                            diagnostics: lsp::PublishDiagnosticsParams {
+                                uri,
+                                diagnostics,
+                                version,
+                            },
+                            result_id,
+                            disk_based_sources,
+                        });
+                    acc
+                },
+            );
+
+        for diagnostic_updates in workspace_diagnostics_updates.into_values() {
+            self.merge_lsp_diagnostics(
+                DiagnosticSourceKind::Pulled,
+                diagnostic_updates,
+                |buffer, old_diagnostic, cx| {
+                    File::from_dyn(buffer.file())
+                        .and_then(|file| {
+                            let abs_path = file.as_local()?.abs_path(cx);
+                            lsp::Url::from_file_path(abs_path).ok()
+                        })
+                        .is_none_or(|buffer_uri| {
+                            unchanged_buffers.contains(&buffer_uri)
+                                || match old_diagnostic.source_kind {
+                                    DiagnosticSourceKind::Pulled => {
+                                        !changed_buffers.contains(&buffer_uri)
+                                    }
+                                    DiagnosticSourceKind::Other | DiagnosticSourceKind::Pushed => {
+                                        true
+                                    }
+                                }
+                        })
+                },
+                cx,
+            )
+            .log_err();
         }
     }
 }

crates/project/src/lsp_store/clangd_ext.rs 🔗

@@ -1,4 +1,4 @@
-use std::sync::Arc;
+use std::{borrow::Cow, sync::Arc};
 
 use ::serde::{Deserialize, Serialize};
 use gpui::WeakEntity;
@@ -6,7 +6,7 @@ use language::{CachedLspAdapter, Diagnostic, DiagnosticSourceKind};
 use lsp::{LanguageServer, LanguageServerName};
 use util::ResultExt as _;
 
-use crate::LspStore;
+use crate::{LspStore, lsp_store::DocumentDiagnosticsUpdate};
 
 pub const CLANGD_SERVER_NAME: LanguageServerName = LanguageServerName::new_static("clangd");
 const INACTIVE_REGION_MESSAGE: &str = "inactive region";
@@ -81,12 +81,16 @@ pub fn register_notifications(
                         version: params.text_document.version,
                         diagnostics,
                     };
-                    this.merge_diagnostics(
-                        server_id,
-                        mapped_diagnostics,
-                        None,
+                    this.merge_lsp_diagnostics(
                         DiagnosticSourceKind::Pushed,
-                        &adapter.disk_based_diagnostic_sources,
+                        vec![DocumentDiagnosticsUpdate {
+                            server_id,
+                            diagnostics: mapped_diagnostics,
+                            result_id: None,
+                            disk_based_sources: Cow::Borrowed(
+                                &adapter.disk_based_diagnostic_sources,
+                            ),
+                        }],
                         |_, diag, _| !is_inactive_region(diag),
                         cx,
                     )

crates/project/src/project.rs 🔗

@@ -74,9 +74,9 @@ use gpui::{
     Task, WeakEntity, Window,
 };
 use language::{
-    Buffer, BufferEvent, Capability, CodeLabel, CursorShape, DiagnosticSourceKind, Language,
-    LanguageName, LanguageRegistry, PointUtf16, ToOffset, ToPointUtf16, Toolchain, ToolchainList,
-    Transaction, Unclipped, language_settings::InlayHintKind, proto::split_operations,
+    Buffer, BufferEvent, Capability, CodeLabel, CursorShape, Language, LanguageName,
+    LanguageRegistry, PointUtf16, ToOffset, ToPointUtf16, Toolchain, ToolchainList, Transaction,
+    Unclipped, language_settings::InlayHintKind, proto::split_operations,
 };
 use lsp::{
     CodeActionKind, CompletionContext, CompletionItemKind, DocumentHighlightKind, InsertTextMode,
@@ -305,7 +305,7 @@ pub enum Event {
         language_server_id: LanguageServerId,
     },
     DiagnosticsUpdated {
-        path: ProjectPath,
+        paths: Vec<ProjectPath>,
         language_server_id: LanguageServerId,
     },
     RemoteIdChanged(Option<u64>),
@@ -2895,18 +2895,17 @@ impl Project {
         cx: &mut Context<Self>,
     ) {
         match event {
-            LspStoreEvent::DiagnosticsUpdated {
-                language_server_id,
-                path,
-            } => cx.emit(Event::DiagnosticsUpdated {
-                path: path.clone(),
-                language_server_id: *language_server_id,
-            }),
-            LspStoreEvent::LanguageServerAdded(language_server_id, name, worktree_id) => cx.emit(
-                Event::LanguageServerAdded(*language_server_id, name.clone(), *worktree_id),
+            LspStoreEvent::DiagnosticsUpdated { server_id, paths } => {
+                cx.emit(Event::DiagnosticsUpdated {
+                    paths: paths.clone(),
+                    language_server_id: *server_id,
+                })
+            }
+            LspStoreEvent::LanguageServerAdded(server_id, name, worktree_id) => cx.emit(
+                Event::LanguageServerAdded(*server_id, name.clone(), *worktree_id),
             ),
-            LspStoreEvent::LanguageServerRemoved(language_server_id) => {
-                cx.emit(Event::LanguageServerRemoved(*language_server_id))
+            LspStoreEvent::LanguageServerRemoved(server_id) => {
+                cx.emit(Event::LanguageServerRemoved(*server_id))
             }
             LspStoreEvent::LanguageServerLog(server_id, log_type, string) => cx.emit(
                 Event::LanguageServerLog(*server_id, log_type.clone(), string.clone()),
@@ -3829,27 +3828,6 @@ impl Project {
         })
     }
 
-    pub fn update_diagnostics(
-        &mut self,
-        language_server_id: LanguageServerId,
-        source_kind: DiagnosticSourceKind,
-        result_id: Option<String>,
-        params: lsp::PublishDiagnosticsParams,
-        disk_based_sources: &[String],
-        cx: &mut Context<Self>,
-    ) -> Result<(), anyhow::Error> {
-        self.lsp_store.update(cx, |lsp_store, cx| {
-            lsp_store.update_diagnostics(
-                language_server_id,
-                params,
-                result_id,
-                source_kind,
-                disk_based_sources,
-                cx,
-            )
-        })
-    }
-
     pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
         let (result_tx, result_rx) = smol::channel::unbounded();
 

crates/project/src/project_tests.rs 🔗

@@ -20,8 +20,8 @@ use gpui::{App, BackgroundExecutor, SemanticVersion, UpdateGlobal};
 use http_client::Url;
 use itertools::Itertools;
 use language::{
-    Diagnostic, DiagnosticEntry, DiagnosticSet, DiskState, FakeLspAdapter, LanguageConfig,
-    LanguageMatcher, LanguageName, LineEnding, OffsetRangeExt, Point, ToPoint,
+    Diagnostic, DiagnosticEntry, DiagnosticSet, DiagnosticSourceKind, DiskState, FakeLspAdapter,
+    LanguageConfig, LanguageMatcher, LanguageName, LineEnding, OffsetRangeExt, Point, ToPoint,
     language_settings::{AllLanguageSettings, LanguageSettingsContent, language_settings},
     tree_sitter_rust, tree_sitter_typescript,
 };
@@ -1619,7 +1619,7 @@ async fn test_disk_based_diagnostics_progress(cx: &mut gpui::TestAppContext) {
         events.next().await.unwrap(),
         Event::DiagnosticsUpdated {
             language_server_id: LanguageServerId(0),
-            path: (worktree_id, Path::new("a.rs")).into()
+            paths: vec![(worktree_id, Path::new("a.rs")).into()],
         }
     );
 
@@ -1667,7 +1667,7 @@ async fn test_disk_based_diagnostics_progress(cx: &mut gpui::TestAppContext) {
         events.next().await.unwrap(),
         Event::DiagnosticsUpdated {
             language_server_id: LanguageServerId(0),
-            path: (worktree_id, Path::new("a.rs")).into()
+            paths: vec![(worktree_id, Path::new("a.rs")).into()],
         }
     );
 

crates/proto/proto/lsp.proto 🔗

@@ -525,6 +525,7 @@ message UpdateDiagnosticSummary {
     uint64 project_id = 1;
     uint64 worktree_id = 2;
     DiagnosticSummary summary = 3;
+    repeated DiagnosticSummary more_summaries = 4;
 }
 
 message DiagnosticSummary {