From e273198ada7f38b180c9205b45035b6e527560bd Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Wed, 6 Mar 2024 15:58:22 -0700 Subject: [PATCH] Debounce language server updates (#8953) We'll send at least one every 100ms, but may send more if other messages are sent on the connection. Release Notes: - Fixed some slowness when collaborating with verbose language servers. --- crates/collab/src/tests/editor_tests.rs | 3 + crates/project/src/project.rs | 102 +++++++++++++++++------- 2 files changed, 75 insertions(+), 30 deletions(-) diff --git a/crates/collab/src/tests/editor_tests.rs b/crates/collab/src/tests/editor_tests.rs index 50a798b5a65dcb1402067a4c837e5f3f218739d3..d379900f9b0dbde73bd74d6463fa30a4024bc1a1 100644 --- a/crates/collab/src/tests/editor_tests.rs +++ b/crates/collab/src/tests/editor_tests.rs @@ -17,6 +17,7 @@ use language::{ language_settings::{AllLanguageSettings, InlayHintSettings}, FakeLspAdapter, }; +use project::SERVER_PROGRESS_DEBOUNCE_TIMEOUT; use rpc::RECEIVE_TIMEOUT; use serde_json::json; use settings::SettingsStore; @@ -865,6 +866,7 @@ async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut Tes }, )), }); + executor.advance_clock(SERVER_PROGRESS_DEBOUNCE_TIMEOUT); executor.run_until_parked(); project_a.read_with(cx_a, |project, _| { @@ -898,6 +900,7 @@ async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut Tes }, )), }); + executor.advance_clock(SERVER_PROGRESS_DEBOUNCE_TIMEOUT); executor.run_until_parked(); project_a.read_with(cx_a, |project, _| { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 1cfd4ee8d535ed54e53eb5287686f4aa2026f0ca..bdca996d945603c8b261a8797893761e6483c507 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -111,6 +111,7 @@ pub use task_inventory::{Inventory, TaskSourceKind}; const MAX_SERVER_REINSTALL_ATTEMPT_COUNT: u64 = 4; const SERVER_REINSTALL_DEBOUNCE_TIMEOUT: Duration = Duration::from_secs(1); const SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); +pub const SERVER_PROGRESS_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100); pub trait Item { fn entry_id(&self, cx: &AppContext) -> Option; @@ -121,6 +122,9 @@ pub struct Project { worktrees: Vec, active_entry: Option, buffer_ordered_messages_tx: mpsc::UnboundedSender, + pending_language_server_update: Option, + flush_language_server_update: Option>, + languages: Arc, supplementary_language_servers: HashMap)>, @@ -185,6 +189,7 @@ struct LspBufferSnapshot { } /// Message ordered with respect to buffer operations +#[derive(Debug)] enum BufferOrderedMessage { Operation { buffer_id: BufferId, @@ -562,6 +567,8 @@ impl Project { Self { worktrees: Vec::new(), buffer_ordered_messages_tx: tx, + flush_language_server_update: None, + pending_language_server_update: None, collaborators: Default::default(), next_buffer_id: BufferId::new(1).unwrap(), opened_buffers: Default::default(), @@ -674,6 +681,8 @@ impl Project { let mut this = Self { worktrees: Vec::new(), buffer_ordered_messages_tx: tx, + pending_language_server_update: None, + flush_language_server_update: None, loading_buffers_by_path: Default::default(), next_buffer_id: BufferId::new(1).unwrap(), opened_buffer: watch::channel(), @@ -1544,8 +1553,7 @@ impl Project { ) }) .collect(); - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::Resync) + self.enqueue_buffer_ordered_message(BufferOrderedMessage::Resync) .unwrap(); cx.notify(); Ok(()) @@ -2336,12 +2344,11 @@ impl Project { match event { BufferEvent::Operation(operation) => { - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::Operation { - buffer_id: buffer.read(cx).remote_id(), - operation: language::proto::serialize_operation(operation), - }) - .ok(); + self.enqueue_buffer_ordered_message(BufferOrderedMessage::Operation { + buffer_id: buffer.read(cx).remote_id(), + operation: language::proto::serialize_operation(operation), + }) + .ok(); } BufferEvent::Edited { .. } => { @@ -2488,8 +2495,7 @@ impl Project { language_server_id, cx, ); - this.buffer_ordered_messages_tx - .unbounded_send( + this.enqueue_buffer_ordered_message( BufferOrderedMessage::LanguageServerUpdate { language_server_id, message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default()) @@ -3663,6 +3669,40 @@ impl Project { .detach(); } + fn enqueue_language_server_progress( + &mut self, + message: BufferOrderedMessage, + cx: &mut ModelContext, + ) { + self.pending_language_server_update.replace(message); + self.flush_language_server_update.get_or_insert_with(|| { + cx.spawn(|this, mut cx| async move { + cx.background_executor() + .timer(SERVER_PROGRESS_DEBOUNCE_TIMEOUT) + .await; + this.update(&mut cx, |this, _| { + this.flush_language_server_update.take(); + if let Some(update) = this.pending_language_server_update.take() { + this.enqueue_buffer_ordered_message(update).ok(); + } + }) + .ok(); + }) + }); + } + + fn enqueue_buffer_ordered_message(&mut self, message: BufferOrderedMessage) -> Result<()> { + if let Some(pending_message) = self.pending_language_server_update.take() { + self.flush_language_server_update.take(); + self.buffer_ordered_messages_tx + .unbounded_send(pending_message) + .map_err(|e| anyhow!(e))?; + } + self.buffer_ordered_messages_tx + .unbounded_send(message) + .map_err(|e| anyhow!(e)) + } + fn on_lsp_progress( &mut self, progress: lsp::ProgressParams, @@ -3700,8 +3740,7 @@ impl Project { if is_disk_based_diagnostics_progress { language_server_status.has_pending_diagnostic_updates = true; self.disk_based_diagnostics_started(language_server_id, cx); - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { + self.enqueue_buffer_ordered_message(BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default()) }) @@ -3717,8 +3756,8 @@ impl Project { }, cx, ); - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { + self.enqueue_buffer_ordered_message( + BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::WorkStart( proto::LspWorkStart { @@ -3727,8 +3766,9 @@ impl Project { percentage: report.percentage, }, ), - }) - .ok(); + }, + ) + .ok(); } } lsp::WorkDoneProgress::Report(report) => { @@ -3743,8 +3783,8 @@ impl Project { }, cx, ); - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { + self.enqueue_language_server_progress( + BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::WorkProgress( proto::LspWorkProgress { @@ -3753,8 +3793,9 @@ impl Project { percentage: report.percentage, }, ), - }) - .ok(); + }, + cx, + ); } } lsp::WorkDoneProgress::End(_) => { @@ -3763,25 +3804,27 @@ impl Project { if is_disk_based_diagnostics_progress { language_server_status.has_pending_diagnostic_updates = false; self.disk_based_diagnostics_finished(language_server_id, cx); - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { + self.enqueue_buffer_ordered_message( + BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( Default::default(), ), - }) - .ok(); + }, + ) + .ok(); } else { self.on_lsp_work_end(language_server_id, token.clone(), cx); - self.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { + self.enqueue_buffer_ordered_message( + BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::WorkEnd( proto::LspWorkEnd { token }, ), - }) - .ok(); + }, + ) + .ok(); } } } @@ -7392,8 +7435,7 @@ impl Project { if is_host { this.opened_buffers .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); - this.buffer_ordered_messages_tx - .unbounded_send(BufferOrderedMessage::Resync) + this.enqueue_buffer_ordered_message(BufferOrderedMessage::Resync) .unwrap(); }