Send language server updates via the same task that sends buffer operations

Max Brunsfeld and Julia Risley created

Co-authored-by: Julia Risley <julia@zed.dev>

Change summary

crates/project/src/project.rs | 225 ++++++++++++++++++++++--------------
1 file changed, 137 insertions(+), 88 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -142,6 +142,10 @@ enum BufferMessage {
         buffer_id: u64,
         operation: proto::Operation,
     },
+    LanguageServerUpdate {
+        language_server_id: LanguageServerId,
+        message: proto::update_language_server::Variant,
+    },
     Resync,
 }
 
@@ -1791,9 +1795,35 @@ impl Project {
     ) -> Option<()> {
         const MAX_BATCH_SIZE: usize = 128;
 
-        let mut needs_resync_with_host = false;
         let mut operations_by_buffer_id = HashMap::default();
+        async fn flush_operations(
+            this: &ModelHandle<Project>,
+            operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
+            needs_resync_with_host: &mut bool,
+            is_local: bool,
+            cx: &AsyncAppContext,
+        ) {
+            for (buffer_id, operations) in operations_by_buffer_id.drain() {
+                let request = this.read_with(cx, |this, _| {
+                    let project_id = this.remote_id()?;
+                    Some(this.client.request(proto::UpdateBuffer {
+                        buffer_id,
+                        project_id,
+                        operations,
+                    }))
+                });
+                if let Some(request) = request {
+                    if request.await.is_err() && !is_local {
+                        *needs_resync_with_host = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        let mut needs_resync_with_host = false;
         let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
+
         while let Some(changes) = changes.next().await {
             let this = this.upgrade(&mut cx)?;
             let is_local = this.read_with(&cx, |this, _| this.is_local());
@@ -1813,6 +1843,7 @@ impl Project {
                             .or_insert(Vec::new())
                             .push(operation);
                     }
+
                     BufferMessage::Resync => {
                         operations_by_buffer_id.clear();
                         if this
@@ -1823,25 +1854,43 @@ impl Project {
                             needs_resync_with_host = false;
                         }
                     }
-                }
-            }
 
-            for (buffer_id, operations) in operations_by_buffer_id.drain() {
-                let request = this.read_with(&cx, |this, _| {
-                    let project_id = this.remote_id()?;
-                    Some(this.client.request(proto::UpdateBuffer {
-                        buffer_id,
-                        project_id,
-                        operations,
-                    }))
-                });
-                if let Some(request) = request {
-                    if request.await.is_err() && !is_local {
-                        needs_resync_with_host = true;
-                        break;
+                    BufferMessage::LanguageServerUpdate {
+                        language_server_id,
+                        message,
+                    } => {
+                        flush_operations(
+                            &this,
+                            &mut operations_by_buffer_id,
+                            &mut needs_resync_with_host,
+                            is_local,
+                            &cx,
+                        )
+                        .await;
+
+                        this.read_with(&cx, |this, _| {
+                            if let Some(project_id) = this.remote_id() {
+                                this.client
+                                    .send(proto::UpdateLanguageServer {
+                                        project_id,
+                                        language_server_id: language_server_id.0 as u64,
+                                        variant: Some(message),
+                                    })
+                                    .log_err();
+                            }
+                        });
                     }
                 }
             }
+
+            flush_operations(
+                &this,
+                &mut operations_by_buffer_id,
+                &mut needs_resync_with_host,
+                is_local,
+                &cx,
+            )
+            .await;
         }
 
         None
@@ -1962,19 +2011,24 @@ impl Project {
                                 Duration::from_secs(1);
 
                             let task = cx.spawn_weak(|this, mut cx| async move {
-                            cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx | {
-                                    this.disk_based_diagnostics_finished(language_server_id, cx);
-                                    this.broadcast_language_server_update(
-                                        language_server_id,
-                                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
-                                            proto::LspDiskBasedDiagnosticsUpdated {},
-                                        ),
-                                    );
-                                });
-                            }
-                        });
+                                cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
+                                if let Some(this) = this.upgrade(&cx) {
+                                    this.update(&mut cx, |this, cx| {
+                                        this.disk_based_diagnostics_finished(
+                                            language_server_id,
+                                            cx,
+                                        );
+                                        this.buffer_changes_tx
+                                            .unbounded_send(
+                                                BufferMessage::LanguageServerUpdate {
+                                                    language_server_id,
+                                                    message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
+                                                },
+                                            )
+                                            .ok();
+                                    });
+                                }
+                            });
                             *simulate_disk_based_diagnostics_completion = Some(task);
                         }
                     }
@@ -2609,7 +2663,7 @@ impl Project {
     fn on_lsp_progress(
         &mut self,
         progress: lsp::ProgressParams,
-        server_id: LanguageServerId,
+        language_server_id: LanguageServerId,
         disk_based_diagnostics_progress_token: Option<String>,
         cx: &mut ModelContext<Self>,
     ) {
@@ -2622,7 +2676,7 @@ impl Project {
         };
         let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
         let language_server_status =
-            if let Some(status) = self.language_server_statuses.get_mut(&server_id) {
+            if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
                 status
             } else {
                 return;
@@ -2642,16 +2696,16 @@ impl Project {
             lsp::WorkDoneProgress::Begin(report) => {
                 if is_disk_based_diagnostics_progress {
                     language_server_status.has_pending_diagnostic_updates = true;
-                    self.disk_based_diagnostics_started(server_id, cx);
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
-                            proto::LspDiskBasedDiagnosticsUpdating {},
-                        ),
-                    );
+                    self.disk_based_diagnostics_started(language_server_id, cx);
+                    self.buffer_changes_tx
+                        .unbounded_send(BufferMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
+                        })
+                        .ok();
                 } else {
                     self.on_lsp_work_start(
-                        server_id,
+                        language_server_id,
                         token.clone(),
                         LanguageServerProgress {
                             message: report.message.clone(),
@@ -2660,20 +2714,24 @@ impl Project {
                         },
                         cx,
                     );
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::WorkStart(proto::LspWorkStart {
-                            token,
-                            message: report.message,
-                            percentage: report.percentage.map(|p| p as u32),
-                        }),
-                    );
+                    self.buffer_changes_tx
+                        .unbounded_send(BufferMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::WorkStart(
+                                proto::LspWorkStart {
+                                    token,
+                                    message: report.message,
+                                    percentage: report.percentage.map(|p| p as u32),
+                                },
+                            ),
+                        })
+                        .ok();
                 }
             }
             lsp::WorkDoneProgress::Report(report) => {
                 if !is_disk_based_diagnostics_progress {
                     self.on_lsp_work_progress(
-                        server_id,
+                        language_server_id,
                         token.clone(),
                         LanguageServerProgress {
                             message: report.message.clone(),
@@ -2682,16 +2740,18 @@ impl Project {
                         },
                         cx,
                     );
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::WorkProgress(
-                            proto::LspWorkProgress {
-                                token,
-                                message: report.message,
-                                percentage: report.percentage.map(|p| p as u32),
-                            },
-                        ),
-                    );
+                    self.buffer_changes_tx
+                        .unbounded_send(BufferMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::WorkProgress(
+                                proto::LspWorkProgress {
+                                    token,
+                                    message: report.message,
+                                    percentage: report.percentage.map(|p| p as u32),
+                                },
+                            ),
+                        })
+                        .ok();
                 }
             }
             lsp::WorkDoneProgress::End(_) => {
@@ -2699,21 +2759,26 @@ impl Project {
 
                 if is_disk_based_diagnostics_progress {
                     language_server_status.has_pending_diagnostic_updates = false;
-                    self.disk_based_diagnostics_finished(server_id, cx);
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
-                            proto::LspDiskBasedDiagnosticsUpdated {},
-                        ),
-                    );
+                    self.disk_based_diagnostics_finished(language_server_id, cx);
+                    self.buffer_changes_tx
+                        .unbounded_send(BufferMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message:
+                                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
+                                    Default::default(),
+                                ),
+                        })
+                        .ok();
                 } else {
-                    self.on_lsp_work_end(server_id, token.clone(), cx);
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd {
-                            token,
-                        }),
-                    );
+                    self.on_lsp_work_end(language_server_id, token.clone(), cx);
+                    self.buffer_changes_tx
+                        .unbounded_send(BufferMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::WorkEnd(
+                                proto::LspWorkEnd { token },
+                            ),
+                        })
+                        .ok();
                 }
             }
         }
@@ -2822,22 +2887,6 @@ impl Project {
         })
     }
 
-    fn broadcast_language_server_update(
-        &self,
-        language_server_id: LanguageServerId,
-        event: proto::update_language_server::Variant,
-    ) {
-        if let Some(project_id) = self.remote_id() {
-            self.client
-                .send(proto::UpdateLanguageServer {
-                    project_id,
-                    language_server_id: language_server_id.0 as u64,
-                    variant: Some(event),
-                })
-                .log_err();
-        }
-    }
-
     pub fn language_server_statuses(
         &self,
     ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {