Reduce intensity of refreshing pull diagnostics (#47510)

Conrad Irwin and John Tur created

Before this change we'd spawn N tasks in parallel on every keystroke,
afterwards
we only allow 1 background diagnostic refresh in flight at a time.

This also fixed a bug where we'd send O(n*2) pull diagnostic requests
when
re-opening a workspace with n editors.

Co-authored-by: John Tur <john-tur@outlook.com>

Closes #ISSUE

Release Notes:

- Improved performance when a large number of files were open by making
background diagnostics more efficient

---------

Co-authored-by: John Tur <john-tur@outlook.com>

Change summary

crates/collab/src/tests/editor_tests.rs |  10 +-
crates/editor/src/editor.rs             | 122 +++++---------------------
crates/project/src/lsp_store.rs         | 120 ++++++++++++++++++-------
3 files changed, 113 insertions(+), 139 deletions(-)

Detailed changes

crates/collab/src/tests/editor_tests.rs 🔗

@@ -3342,9 +3342,9 @@ async fn test_lsp_pull_diagnostics(
         editor.handle_input(":", window, cx);
     });
     pull_diagnostics_handle.next().await.unwrap();
-    pull_diagnostics_handle.next().await.unwrap();
+    // pull_diagnostics_handle.next().await.unwrap();
     assert_eq!(
-        5,
+        4,
         diagnostics_pulls_made.load(atomic::Ordering::Acquire),
         "Client lib.rs edits should trigger another diagnostics pull for open buffers"
     );
@@ -3364,7 +3364,7 @@ async fn test_lsp_pull_diagnostics(
     pull_diagnostics_handle.next().await.unwrap();
     pull_diagnostics_handle.next().await.unwrap();
     assert_eq!(
-        8,
+        7,
         diagnostics_pulls_made.load(atomic::Ordering::Acquire),
         "Client main.rs edits should trigger diagnostics pull by both client and host and an extra pull for the client's lib.rs"
     );
@@ -3384,7 +3384,7 @@ async fn test_lsp_pull_diagnostics(
     pull_diagnostics_handle.next().await.unwrap();
     pull_diagnostics_handle.next().await.unwrap();
     assert_eq!(
-        11,
+        10,
         diagnostics_pulls_made.load(atomic::Ordering::Acquire),
         "Host main.rs edits should trigger another diagnostics pull by both client and host and another pull for the client's lib.rs"
     );
@@ -3417,7 +3417,7 @@ async fn test_lsp_pull_diagnostics(
     pull_diagnostics_handle.next().await.unwrap();
     pull_diagnostics_handle.next().await.unwrap();
     assert_eq!(
-        13,
+        12,
         diagnostics_pulls_made.load(atomic::Ordering::Acquire),
         "Workspace refresh should trigger document pulls for all open buffers (main.rs and lib.rs)"
     );

crates/editor/src/editor.rs 🔗

@@ -98,9 +98,8 @@ use edit_prediction_types::{
 use editor_settings::{GoToDefinitionFallback, Minimap as MinimapSettings};
 use element::{AcceptEditPredictionBinding, LineWithInvisibles, PositionMap, layout_line};
 use futures::{
-    FutureExt, StreamExt as _,
+    FutureExt,
     future::{self, Shared, join},
-    stream::FuturesUnordered,
 };
 use fuzzy::{StringMatch, StringMatchCandidate};
 use git::blame::{GitBlame, GlobalBlameRenderer};
@@ -1299,7 +1298,6 @@ pub struct Editor {
     next_review_comment_id: usize,
     hovered_diff_hunk_row: Option<DisplayRow>,
     pull_diagnostics_task: Task<()>,
-    pull_diagnostics_background_task: Task<()>,
     in_project_search: bool,
     previous_search_ranges: Option<Arc<[Range<Anchor>]>>,
     breadcrumb_header: Option<String>,
@@ -2508,7 +2506,6 @@ impl Editor {
                 .unwrap_or_default(),
             tasks_update_task: None,
             pull_diagnostics_task: Task::ready(()),
-            pull_diagnostics_background_task: Task::ready(()),
             colors: None,
             refresh_colors_task: Task::ready(()),
             inlay_hints: None,
@@ -19252,8 +19249,8 @@ impl Editor {
 
     fn pull_diagnostics(
         &mut self,
-        buffer_id: Option<BufferId>,
-        window: &Window,
+        buffer_id: BufferId,
+        _window: &Window,
         cx: &mut Context<Self>,
     ) -> Option<()> {
         if self.ignore_lsp_data() || !self.diagnostics_enabled() {
@@ -19265,102 +19262,27 @@ impl Editor {
         if !pull_diagnostics_settings.enabled {
             return None;
         }
+        let debounce = Duration::from_millis(pull_diagnostics_settings.debounce_ms);
         let project = self.project()?.downgrade();
+        let buffer = self.buffer().read(cx).buffer(buffer_id)?;
 
-        let mut edited_buffer_ids = HashSet::default();
-        let mut edited_worktree_ids = HashSet::default();
-        let edited_buffers = match buffer_id {
-            Some(buffer_id) => {
-                let buffer = self.buffer().read(cx).buffer(buffer_id)?;
-                let worktree_id = buffer.read(cx).file().map(|f| f.worktree_id(cx))?;
-                edited_buffer_ids.insert(buffer.read(cx).remote_id());
-                edited_worktree_ids.insert(worktree_id);
-                vec![buffer]
-            }
-            None => self
-                .buffer()
-                .read(cx)
-                .all_buffers()
-                .into_iter()
-                .filter(|buffer| {
-                    let buffer = buffer.read(cx);
-                    match buffer.file().map(|f| f.worktree_id(cx)) {
-                        Some(worktree_id) => {
-                            edited_buffer_ids.insert(buffer.remote_id());
-                            edited_worktree_ids.insert(worktree_id);
-                            true
-                        }
-                        None => false,
-                    }
+        self.pull_diagnostics_task = cx.spawn(async move |_, cx| {
+            cx.background_executor().timer(debounce).await;
+            if let Ok(task) = project.update(cx, |project, cx| {
+                project.lsp_store().update(cx, |lsp_store, cx| {
+                    lsp_store.pull_diagnostics_for_buffer(buffer, cx)
                 })
-                .collect::<Vec<_>>(),
-        };
-
-        if edited_buffers.is_empty() {
-            self.pull_diagnostics_task = Task::ready(());
-            self.pull_diagnostics_background_task = Task::ready(());
-            return None;
-        }
-
-        let mut already_used_buffers = HashSet::default();
-        let related_open_buffers = self
-            .workspace
-            .as_ref()
-            .and_then(|(workspace, _)| workspace.upgrade())
-            .into_iter()
-            .flat_map(|workspace| workspace.read(cx).panes())
-            .flat_map(|pane| pane.read(cx).items_of_type::<Editor>())
-            .filter(|editor| editor != &cx.entity())
-            .flat_map(|editor| editor.read(cx).buffer().read(cx).all_buffers())
-            .filter(|buffer| {
-                let buffer = buffer.read(cx);
-                let buffer_id = buffer.remote_id();
-                if already_used_buffers.insert(buffer_id) {
-                    if let Some(worktree_id) = buffer.file().map(|f| f.worktree_id(cx)) {
-                        return !edited_buffer_ids.contains(&buffer_id)
-                            && edited_worktree_ids.contains(&worktree_id);
-                    }
-                }
-                false
-            })
-            .collect::<Vec<_>>();
-
-        let debounce = Duration::from_millis(pull_diagnostics_settings.debounce_ms);
-        let make_spawn = |buffers: Vec<Entity<Buffer>>, delay: Duration| {
-            if buffers.is_empty() {
-                return Task::ready(());
+            }) {
+                task.await.log_err();
             }
-            let project_weak = project.clone();
-            cx.spawn_in(window, async move |_, cx| {
-                cx.background_executor().timer(delay).await;
-
-                let Ok(mut pull_diagnostics_tasks) = cx.update(|_, cx| {
-                    buffers
-                        .into_iter()
-                        .filter_map(|buffer| {
-                            project_weak
-                                .update(cx, |project, cx| {
-                                    project.lsp_store().update(cx, |lsp_store, cx| {
-                                        lsp_store.pull_diagnostics_for_buffer(buffer, cx)
-                                    })
-                                })
-                                .ok()
-                        })
-                        .collect::<FuturesUnordered<_>>()
-                }) else {
-                    return;
-                };
-
-                while let Some(pull_task) = pull_diagnostics_tasks.next().await {
-                    if let Err(e) = pull_task {
-                        log::error!("Failed to update project diagnostics: {e:#}");
-                    }
-                }
-            })
-        };
-
-        self.pull_diagnostics_task = make_spawn(edited_buffers, debounce);
-        self.pull_diagnostics_background_task = make_spawn(related_open_buffers, debounce * 2);
+            project
+                .update(cx, |project, cx| {
+                    project.lsp_store().update(cx, |lsp_store, cx| {
+                        lsp_store.pull_document_diagnostics_for_buffer_edit(buffer_id, cx);
+                    })
+                })
+                .log_err();
+        });
 
         Some(())
     }
@@ -24941,7 +24863,9 @@ impl Editor {
         window: &mut Window,
         cx: &mut Context<'_, Self>,
     ) {
-        self.pull_diagnostics(for_buffer, window, cx);
+        if let Some(buffer_id) = for_buffer {
+            self.pull_diagnostics(buffer_id, window, cx);
+        }
         self.refresh_colors_for_visible_range(for_buffer, window, cx);
     }
 

crates/project/src/lsp_store.rs 🔗

@@ -104,7 +104,7 @@ use std::{
     borrow::Cow,
     cell::RefCell,
     cmp::{Ordering, Reverse},
-    collections::hash_map,
+    collections::{VecDeque, hash_map},
     convert::TryInto,
     ffi::OsStr,
     future::ready,
@@ -301,6 +301,10 @@ pub struct LocalLspStore {
         HashMap<Option<SharedString>, HashMap<PathBuf, Option<SharedString>>>,
     >,
     restricted_worktrees_tasks: HashMap<WorktreeId, (Subscription, watch::Receiver<bool>)>,
+
+    buffers_to_refresh_hash_set: HashSet<BufferId>,
+    buffers_to_refresh_queue: VecDeque<BufferId>,
+    _background_diagnostics_worker: Shared<Task<()>>,
 }
 
 impl LocalLspStore {
@@ -1073,7 +1077,7 @@ impl LocalLspStore {
                                 })
                                 .transpose()?;
                             anyhow::Ok(
-                                lsp_store.pull_document_diagnostics_for_server(server_id, cx),
+                                lsp_store.pull_document_diagnostics_for_server(server_id, None, cx),
                             )
                         })??
                         .await;
@@ -4052,6 +4056,9 @@ impl LspStore {
                 language_server_paths_watched_for_rename: Default::default(),
                 language_server_dynamic_registrations: Default::default(),
                 buffers_being_formatted: Default::default(),
+                buffers_to_refresh_hash_set: HashSet::default(),
+                buffers_to_refresh_queue: VecDeque::new(),
+                _background_diagnostics_worker: Task::ready(()).shared(),
                 buffer_snapshots: Default::default(),
                 prettier_store,
                 environment,
@@ -4304,6 +4311,47 @@ impl LspStore {
         Ok(())
     }
 
+    pub fn refresh_background_diagnostics_for_buffers(
+        &mut self,
+        buffers: HashSet<BufferId>,
+        cx: &mut Context<Self>,
+    ) -> Shared<Task<()>> {
+        let Some(local) = self.as_local_mut() else {
+            return Task::ready(()).shared();
+        };
+        for buffer in buffers {
+            if local.buffers_to_refresh_hash_set.insert(buffer) {
+                local.buffers_to_refresh_queue.push_back(buffer);
+                if local.buffers_to_refresh_queue.len() == 1 {
+                    local._background_diagnostics_worker =
+                        Self::background_diagnostics_worker(cx).shared();
+                }
+            }
+        }
+
+        local._background_diagnostics_worker.clone()
+    }
+
+    fn refresh_next_buffer(&mut self, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
+        let buffer_store = self.buffer_store.clone();
+        let local = self.as_local_mut()?;
+        while let Some(buffer_id) = local.buffers_to_refresh_queue.pop_front() {
+            local.buffers_to_refresh_hash_set.remove(&buffer_id);
+            if let Some(buffer) = buffer_store.read(cx).get(buffer_id) {
+                return Some(self.pull_diagnostics_for_buffer(buffer, cx));
+            }
+        }
+        None
+    }
+
+    fn background_diagnostics_worker(cx: &mut Context<Self>) -> Task<()> {
+        cx.spawn(async move |this, cx| {
+            while let Ok(Some(task)) = this.update(cx, |this, cx| this.refresh_next_buffer(cx)) {
+                task.await.log_err();
+            }
+        })
+    }
+
     pub(crate) fn register_buffer_with_language_servers(
         &mut self,
         buffer: &Entity<Buffer>,
@@ -9290,6 +9338,11 @@ impl LspStore {
                 false,
                 cx,
             );
+            // Pull diagnostics for the buffer even if it was already registered.
+            // This is needed to make test_streamed_lsp_pull_diagnostics pass,
+            // but it's unclear if we need it.
+            this.pull_diagnostics_for_buffer(buffer.clone(), cx)
+                .detach();
             this.buffer_store().update(cx, |buffer_store, _| {
                 buffer_store.register_shared_lsp_handle(peer_id, buffer_id, handle);
             });
@@ -12250,39 +12303,37 @@ impl LspStore {
     pub fn pull_document_diagnostics_for_server(
         &mut self,
         server_id: LanguageServerId,
+        source_buffer_id: Option<BufferId>,
         cx: &mut Context<Self>,
-    ) -> Task<()> {
-        let buffers_to_pull = self
-            .as_local()
-            .into_iter()
-            .flat_map(|local| {
-                self.buffer_store.read(cx).buffers().filter(|buffer| {
-                    let buffer_id = buffer.read(cx).remote_id();
-                    local
-                        .buffers_opened_in_servers
-                        .get(&buffer_id)
-                        .is_some_and(|servers| servers.contains(&server_id))
-                })
-            })
-            .collect::<Vec<_>>();
-
-        let pulls = join_all(buffers_to_pull.into_iter().map(|buffer| {
-            let buffer_path = buffer.read(cx).file().map(|f| f.full_path(cx));
-            let pull_task = self.pull_diagnostics_for_buffer(buffer, cx);
-            async move { (buffer_path, pull_task.await) }
-        }));
-        cx.background_spawn(async move {
-            for (pull_task_path, pull_task_result) in pulls.await {
-                if let Err(e) = pull_task_result {
-                    match pull_task_path {
-                        Some(path) => {
-                            log::error!("Failed to pull diagnostics for buffer {path:?}: {e:#}");
-                        }
-                        None => log::error!("Failed to pull diagnostics: {e:#}"),
-                    }
-                }
+    ) -> Shared<Task<()>> {
+        let Some(local) = self.as_local_mut() else {
+            return Task::ready(()).shared();
+        };
+        let mut buffers_to_refresh = HashSet::default();
+        for (buffer_id, server_ids) in &local.buffers_opened_in_servers {
+            if server_ids.contains(&server_id) && Some(buffer_id) != source_buffer_id.as_ref() {
+                buffers_to_refresh.insert(*buffer_id);
             }
-        })
+        }
+
+        self.refresh_background_diagnostics_for_buffers(buffers_to_refresh, cx)
+    }
+
+    pub fn pull_document_diagnostics_for_buffer_edit(
+        &mut self,
+        buffer_id: BufferId,
+        cx: &mut Context<Self>,
+    ) {
+        let Some(local) = self.as_local_mut() else {
+            return;
+        };
+        let Some(languages_servers) = local.buffers_opened_in_servers.get(&buffer_id).cloned()
+        else {
+            return;
+        };
+        for server_id in languages_servers {
+            let _ = self.pull_document_diagnostics_for_server(server_id, Some(buffer_id), cx);
+        }
     }
 
     fn apply_workspace_diagnostic_report(
@@ -12725,8 +12776,7 @@ impl LspStore {
 
                         notify_server_capabilities_updated(&server, cx);
 
-                        self.pull_document_diagnostics_for_server(server_id, cx)
-                            .detach();
+                        let _ = self.pull_document_diagnostics_for_server(server_id, None, cx);
                     }
                 }
                 "textDocument/documentColor" => {