Debounce language server updates (#8953)

Conrad Irwin created

We'll send at least one every 100ms, but may send more if other messages
are sent on the connection.

Release Notes:

- Fixed some slowness when collaborating with verbose language servers.

Change summary

crates/collab/src/tests/editor_tests.rs |   3 
crates/project/src/project.rs           | 102 +++++++++++++++++++-------
2 files changed, 75 insertions(+), 30 deletions(-)

Detailed changes

crates/collab/src/tests/editor_tests.rs 🔗

@@ -17,6 +17,7 @@ use language::{
     language_settings::{AllLanguageSettings, InlayHintSettings},
     FakeLspAdapter,
 };
+use project::SERVER_PROGRESS_DEBOUNCE_TIMEOUT;
 use rpc::RECEIVE_TIMEOUT;
 use serde_json::json;
 use settings::SettingsStore;
@@ -865,6 +866,7 @@ async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut Tes
             },
         )),
     });
+    executor.advance_clock(SERVER_PROGRESS_DEBOUNCE_TIMEOUT);
     executor.run_until_parked();
 
     project_a.read_with(cx_a, |project, _| {
@@ -898,6 +900,7 @@ async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut Tes
             },
         )),
     });
+    executor.advance_clock(SERVER_PROGRESS_DEBOUNCE_TIMEOUT);
     executor.run_until_parked();
 
     project_a.read_with(cx_a, |project, _| {

crates/project/src/project.rs 🔗

@@ -111,6 +111,7 @@ pub use task_inventory::{Inventory, TaskSourceKind};
 const MAX_SERVER_REINSTALL_ATTEMPT_COUNT: u64 = 4;
 const SERVER_REINSTALL_DEBOUNCE_TIMEOUT: Duration = Duration::from_secs(1);
 const SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
+pub const SERVER_PROGRESS_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100);
 
 pub trait Item {
     fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
@@ -121,6 +122,9 @@ pub struct Project {
     worktrees: Vec<WorktreeHandle>,
     active_entry: Option<ProjectEntryId>,
     buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
+    pending_language_server_update: Option<BufferOrderedMessage>,
+    flush_language_server_update: Option<Task<()>>,
+
     languages: Arc<LanguageRegistry>,
     supplementary_language_servers:
         HashMap<LanguageServerId, (LanguageServerName, Arc<LanguageServer>)>,
@@ -185,6 +189,7 @@ struct LspBufferSnapshot {
 }
 
 /// Message ordered with respect to buffer operations
+#[derive(Debug)]
 enum BufferOrderedMessage {
     Operation {
         buffer_id: BufferId,
@@ -562,6 +567,8 @@ impl Project {
             Self {
                 worktrees: Vec::new(),
                 buffer_ordered_messages_tx: tx,
+                flush_language_server_update: None,
+                pending_language_server_update: None,
                 collaborators: Default::default(),
                 next_buffer_id: BufferId::new(1).unwrap(),
                 opened_buffers: Default::default(),
@@ -674,6 +681,8 @@ impl Project {
             let mut this = Self {
                 worktrees: Vec::new(),
                 buffer_ordered_messages_tx: tx,
+                pending_language_server_update: None,
+                flush_language_server_update: None,
                 loading_buffers_by_path: Default::default(),
                 next_buffer_id: BufferId::new(1).unwrap(),
                 opened_buffer: watch::channel(),
@@ -1544,8 +1553,7 @@ impl Project {
                 )
             })
             .collect();
-        self.buffer_ordered_messages_tx
-            .unbounded_send(BufferOrderedMessage::Resync)
+        self.enqueue_buffer_ordered_message(BufferOrderedMessage::Resync)
             .unwrap();
         cx.notify();
         Ok(())
@@ -2336,12 +2344,11 @@ impl Project {
 
         match event {
             BufferEvent::Operation(operation) => {
-                self.buffer_ordered_messages_tx
-                    .unbounded_send(BufferOrderedMessage::Operation {
-                        buffer_id: buffer.read(cx).remote_id(),
-                        operation: language::proto::serialize_operation(operation),
-                    })
-                    .ok();
+                self.enqueue_buffer_ordered_message(BufferOrderedMessage::Operation {
+                    buffer_id: buffer.read(cx).remote_id(),
+                    operation: language::proto::serialize_operation(operation),
+                })
+                .ok();
             }
 
             BufferEvent::Edited { .. } => {
@@ -2488,8 +2495,7 @@ impl Project {
                                             language_server_id,
                                             cx,
                                         );
-                                        this.buffer_ordered_messages_tx
-                                            .unbounded_send(
+                                        this.enqueue_buffer_ordered_message(
                                                 BufferOrderedMessage::LanguageServerUpdate {
                                                     language_server_id,
                                                     message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
@@ -3663,6 +3669,40 @@ impl Project {
         .detach();
     }
 
+    fn enqueue_language_server_progress(
+        &mut self,
+        message: BufferOrderedMessage,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.pending_language_server_update.replace(message);
+        self.flush_language_server_update.get_or_insert_with(|| {
+            cx.spawn(|this, mut cx| async move {
+                cx.background_executor()
+                    .timer(SERVER_PROGRESS_DEBOUNCE_TIMEOUT)
+                    .await;
+                this.update(&mut cx, |this, _| {
+                    this.flush_language_server_update.take();
+                    if let Some(update) = this.pending_language_server_update.take() {
+                        this.enqueue_buffer_ordered_message(update).ok();
+                    }
+                })
+                .ok();
+            })
+        });
+    }
+
+    fn enqueue_buffer_ordered_message(&mut self, message: BufferOrderedMessage) -> Result<()> {
+        if let Some(pending_message) = self.pending_language_server_update.take() {
+            self.flush_language_server_update.take();
+            self.buffer_ordered_messages_tx
+                .unbounded_send(pending_message)
+                .map_err(|e| anyhow!(e))?;
+        }
+        self.buffer_ordered_messages_tx
+            .unbounded_send(message)
+            .map_err(|e| anyhow!(e))
+    }
+
     fn on_lsp_progress(
         &mut self,
         progress: lsp::ProgressParams,
@@ -3700,8 +3740,7 @@ impl Project {
                 if is_disk_based_diagnostics_progress {
                     language_server_status.has_pending_diagnostic_updates = true;
                     self.disk_based_diagnostics_started(language_server_id, cx);
-                    self.buffer_ordered_messages_tx
-                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                    self.enqueue_buffer_ordered_message(BufferOrderedMessage::LanguageServerUpdate {
                             language_server_id,
                             message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
                         })
@@ -3717,8 +3756,8 @@ impl Project {
                         },
                         cx,
                     );
-                    self.buffer_ordered_messages_tx
-                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                    self.enqueue_buffer_ordered_message(
+                        BufferOrderedMessage::LanguageServerUpdate {
                             language_server_id,
                             message: proto::update_language_server::Variant::WorkStart(
                                 proto::LspWorkStart {
@@ -3727,8 +3766,9 @@ impl Project {
                                     percentage: report.percentage,
                                 },
                             ),
-                        })
-                        .ok();
+                        },
+                    )
+                    .ok();
                 }
             }
             lsp::WorkDoneProgress::Report(report) => {
@@ -3743,8 +3783,8 @@ impl Project {
                         },
                         cx,
                     );
-                    self.buffer_ordered_messages_tx
-                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                    self.enqueue_language_server_progress(
+                        BufferOrderedMessage::LanguageServerUpdate {
                             language_server_id,
                             message: proto::update_language_server::Variant::WorkProgress(
                                 proto::LspWorkProgress {
@@ -3753,8 +3793,9 @@ impl Project {
                                     percentage: report.percentage,
                                 },
                             ),
-                        })
-                        .ok();
+                        },
+                        cx,
+                    );
                 }
             }
             lsp::WorkDoneProgress::End(_) => {
@@ -3763,25 +3804,27 @@ impl Project {
                 if is_disk_based_diagnostics_progress {
                     language_server_status.has_pending_diagnostic_updates = false;
                     self.disk_based_diagnostics_finished(language_server_id, cx);
-                    self.buffer_ordered_messages_tx
-                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                    self.enqueue_buffer_ordered_message(
+                        BufferOrderedMessage::LanguageServerUpdate {
                             language_server_id,
                             message:
                                 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
                                     Default::default(),
                                 ),
-                        })
-                        .ok();
+                        },
+                    )
+                    .ok();
                 } else {
                     self.on_lsp_work_end(language_server_id, token.clone(), cx);
-                    self.buffer_ordered_messages_tx
-                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                    self.enqueue_buffer_ordered_message(
+                        BufferOrderedMessage::LanguageServerUpdate {
                             language_server_id,
                             message: proto::update_language_server::Variant::WorkEnd(
                                 proto::LspWorkEnd { token },
                             ),
-                        })
-                        .ok();
+                        },
+                    )
+                    .ok();
                 }
             }
         }
@@ -7392,8 +7435,7 @@ impl Project {
             if is_host {
                 this.opened_buffers
                     .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
-                this.buffer_ordered_messages_tx
-                    .unbounded_send(BufferOrderedMessage::Resync)
+                this.enqueue_buffer_ordered_message(BufferOrderedMessage::Resync)
                     .unwrap();
             }