Listen to all LSP progress notifications and broadcast them to peers

Antonio Scandurra created

Change summary

crates/project/src/project.rs | 286 ++++++++++++++++++++++--------------
crates/rpc/proto/zed.proto    |  30 +++
crates/rpc/src/proto.rs       |   6 
crates/server/src/rpc.rs      |  21 --
4 files changed, 207 insertions(+), 136 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -28,7 +28,6 @@ use rand::prelude::*;
 use search::SearchQuery;
 use sha2::{Digest, Sha256};
 use similar::{ChangeTag, TextDiff};
-use smol::block_on;
 use std::{
     cell::RefCell,
     cmp::{self, Ordering},
@@ -115,6 +114,21 @@ pub enum Event {
     DiagnosticsUpdated(ProjectPath),
 }
 
+enum LspEvent {
+    WorkStart {
+        token: String,
+    },
+    WorkProgress {
+        token: String,
+        message: Option<String>,
+        percentage: Option<usize>,
+    },
+    WorkEnd {
+        token: String,
+    },
+    DiagnosticsUpdate(lsp::PublishDiagnosticsParams),
+}
+
 #[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
 pub struct ProjectPath {
     pub worktree_id: WorktreeId,
@@ -203,8 +217,7 @@ impl Project {
         client.add_entity_message_handler(Self::handle_add_collaborator);
         client.add_entity_message_handler(Self::handle_buffer_reloaded);
         client.add_entity_message_handler(Self::handle_buffer_saved);
-        client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updated);
-        client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updating);
+        client.add_entity_message_handler(Self::handle_lsp_event);
         client.add_entity_message_handler(Self::handle_remove_collaborator);
         client.add_entity_message_handler(Self::handle_register_worktree);
         client.add_entity_message_handler(Self::handle_unregister_worktree);
@@ -1155,12 +1168,6 @@ impl Project {
         language: Arc<Language>,
         cx: &mut ModelContext<Self>,
     ) {
-        enum LspEvent {
-            DiagnosticsStart,
-            DiagnosticsUpdate(lsp::PublishDiagnosticsParams),
-            DiagnosticsFinish,
-        }
-
         let key = (worktree_id, language.name());
         self.started_language_servers
             .entry(key.clone())
@@ -1171,76 +1178,50 @@ impl Project {
                     self.client.http_client(),
                     cx,
                 );
-                let rpc = self.client.clone();
                 cx.spawn_weak(|this, mut cx| async move {
                     let mut language_server = language_server?.await.log_err()?;
                     let this = this.upgrade(&cx)?;
+                    let (lsp_events_tx, lsp_events_rx) = smol::channel::unbounded();
 
-                    let disk_based_sources = language
-                        .disk_based_diagnostic_sources()
-                        .cloned()
-                        .unwrap_or_default();
-                    let disk_based_diagnostics_progress_token =
-                        language.disk_based_diagnostics_progress_token().cloned();
-                    let has_disk_based_diagnostic_progress_token =
-                        disk_based_diagnostics_progress_token.is_some();
-                    let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
-
-                    // Listen for `PublishDiagnostics` notifications.
                     language_server
                         .on_notification::<lsp::notification::PublishDiagnostics, _>({
-                            let diagnostics_tx = diagnostics_tx.clone();
+                            let lsp_events_tx = lsp_events_tx.clone();
                             move |params| {
-                                if !has_disk_based_diagnostic_progress_token {
-                                    block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart)).ok();
-                                }
-                                block_on(diagnostics_tx.send(LspEvent::DiagnosticsUpdate(params)))
+                                lsp_events_tx
+                                    .try_send(LspEvent::DiagnosticsUpdate(params))
                                     .ok();
-                                if !has_disk_based_diagnostic_progress_token {
-                                    block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish)).ok();
-                                }
                             }
                         })
                         .detach();
 
-                    // Listen for `Progress` notifications. Send an event when the language server
-                    // transitions between running jobs and not running any jobs.
-                    let mut running_jobs_for_this_server: i32 = 0;
                     language_server
                         .on_notification::<lsp::notification::Progress, _>(move |params| {
                             let token = match params.token {
-                                lsp::NumberOrString::Number(_) => None,
-                                lsp::NumberOrString::String(token) => Some(token),
+                                lsp::NumberOrString::String(token) => token,
+                                lsp::NumberOrString::Number(token) => {
+                                    log::info!("skipping numeric progress token {}", token);
+                                    return;
+                                }
                             };
 
-                            if token == disk_based_diagnostics_progress_token {
-                                match params.value {
-                                    lsp::ProgressParamsValue::WorkDone(progress) => {
-                                        match progress {
-                                            lsp::WorkDoneProgress::Begin(_) => {
-                                                running_jobs_for_this_server += 1;
-                                                if running_jobs_for_this_server == 1 {
-                                                    block_on(
-                                                        diagnostics_tx
-                                                            .send(LspEvent::DiagnosticsStart),
-                                                    )
-                                                    .ok();
-                                                }
-                                            }
-                                            lsp::WorkDoneProgress::End(_) => {
-                                                running_jobs_for_this_server -= 1;
-                                                if running_jobs_for_this_server == 0 {
-                                                    block_on(
-                                                        diagnostics_tx
-                                                            .send(LspEvent::DiagnosticsFinish),
-                                                    )
-                                                    .ok();
-                                                }
-                                            }
-                                            _ => {}
-                                        }
+                            match params.value {
+                                lsp::ProgressParamsValue::WorkDone(progress) => match progress {
+                                    lsp::WorkDoneProgress::Begin(_) => {
+                                        lsp_events_tx.try_send(LspEvent::WorkStart { token }).ok();
                                     }
-                                }
+                                    lsp::WorkDoneProgress::Report(report) => {
+                                        lsp_events_tx
+                                            .try_send(LspEvent::WorkProgress {
+                                                token,
+                                                message: report.message,
+                                                percentage: report.percentage.map(|p| p as usize),
+                                            })
+                                            .ok();
+                                    }
+                                    lsp::WorkDoneProgress::End(_) => {
+                                        lsp_events_tx.try_send(LspEvent::WorkEnd { token }).ok();
+                                    }
+                                },
                             }
                         })
                         .detach();
@@ -1249,43 +1230,11 @@ impl Project {
                     cx.spawn(|mut cx| {
                         let this = this.downgrade();
                         async move {
-                            while let Ok(message) = diagnostics_rx.recv().await {
+                            while let Ok(event) = lsp_events_rx.recv().await {
                                 let this = this.upgrade(&cx)?;
-                                match message {
-                                    LspEvent::DiagnosticsStart => {
-                                        this.update(&mut cx, |this, cx| {
-                                            this.disk_based_diagnostics_started(cx);
-                                            if let Some(project_id) = this.remote_id() {
-                                                rpc.send(proto::DiskBasedDiagnosticsUpdating {
-                                                    project_id,
-                                                })
-                                                .log_err();
-                                            }
-                                        });
-                                    }
-                                    LspEvent::DiagnosticsUpdate(mut params) => {
-                                        language.process_diagnostics(&mut params);
-                                        this.update(&mut cx, |this, cx| {
-                                            this.update_diagnostics(
-                                                params,
-                                                &disk_based_sources,
-                                                cx,
-                                            )
-                                            .log_err();
-                                        });
-                                    }
-                                    LspEvent::DiagnosticsFinish => {
-                                        this.update(&mut cx, |this, cx| {
-                                            this.disk_based_diagnostics_finished(cx);
-                                            if let Some(project_id) = this.remote_id() {
-                                                rpc.send(proto::DiskBasedDiagnosticsUpdated {
-                                                    project_id,
-                                                })
-                                                .log_err();
-                                            }
-                                        });
-                                    }
-                                }
+                                this.update(&mut cx, |this, cx| {
+                                    this.on_local_lsp_event(event, &language, cx)
+                                });
                             }
                             Some(())
                         }
@@ -1358,6 +1307,107 @@ impl Project {
             });
     }
 
+    fn on_local_lsp_event(
+        &mut self,
+        event: LspEvent,
+        language: &Arc<Language>,
+        cx: &mut ModelContext<Self>,
+    ) {
+        let disk_diagnostics_token = language.disk_based_diagnostics_progress_token();
+        match event {
+            LspEvent::WorkStart { token } => {
+                if Some(&token) == disk_diagnostics_token {
+                    self.disk_based_diagnostics_started(cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::DiskBasedDiagnosticsUpdating(
+                        proto::LspDiskBasedDiagnosticsUpdating {},
+                    ));
+                } else {
+                    self.on_lsp_work_start(token.clone(), cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::WorkStart(
+                        proto::LspWorkStart { token },
+                    ));
+                }
+            }
+            LspEvent::WorkProgress {
+                token,
+                message,
+                percentage,
+            } => {
+                if Some(&token) != disk_diagnostics_token {
+                    self.on_lsp_work_progress(token.clone(), message.clone(), percentage, cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::WorkProgress(
+                        proto::LspWorkProgress {
+                            token,
+                            message,
+                            percentage: percentage.map(|p| p as u32),
+                        },
+                    ));
+                }
+            }
+            LspEvent::WorkEnd { token } => {
+                if Some(&token) == disk_diagnostics_token {
+                    self.disk_based_diagnostics_finished(cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::DiskBasedDiagnosticsUpdated(
+                        proto::LspDiskBasedDiagnosticsUpdated {},
+                    ));
+                } else {
+                    self.on_lsp_work_end(token.clone(), cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::WorkEnd(proto::LspWorkEnd {
+                        token,
+                    }));
+                }
+            }
+            LspEvent::DiagnosticsUpdate(mut params) => {
+                language.process_diagnostics(&mut params);
+
+                if disk_diagnostics_token.is_none() {
+                    self.disk_based_diagnostics_started(cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::DiskBasedDiagnosticsUpdating(
+                        proto::LspDiskBasedDiagnosticsUpdating {},
+                    ));
+                }
+                self.update_diagnostics(
+                    params,
+                    language
+                        .disk_based_diagnostic_sources()
+                        .unwrap_or(&Default::default()),
+                    cx,
+                )
+                .log_err();
+                if disk_diagnostics_token.is_none() {
+                    self.disk_based_diagnostics_finished(cx);
+                    self.send_lsp_event(proto::lsp_event::Variant::DiskBasedDiagnosticsUpdated(
+                        proto::LspDiskBasedDiagnosticsUpdated {},
+                    ));
+                }
+            }
+        }
+    }
+
+    fn on_lsp_work_start(&mut self, token: String, cx: &mut ModelContext<Self>) {}
+
+    fn on_lsp_work_progress(
+        &mut self,
+        token: String,
+        message: Option<String>,
+        percentage: Option<usize>,
+        cx: &mut ModelContext<Self>,
+    ) {
+    }
+
+    fn on_lsp_work_end(&mut self, token: String, cx: &mut ModelContext<Self>) {}
+
+    fn send_lsp_event(&self, event: proto::lsp_event::Variant) {
+        if let Some(project_id) = self.remote_id() {
+            self.client
+                .send(proto::LspEvent {
+                    project_id,
+                    variant: Some(event),
+                })
+                .log_err();
+        }
+    }
+
     pub fn update_diagnostics(
         &mut self,
         params: lsp::PublishDiagnosticsParams,
@@ -3096,23 +3146,41 @@ impl Project {
         })
     }
 
-    async fn handle_disk_based_diagnostics_updating(
+    async fn handle_lsp_event(
         this: ModelHandle<Self>,
-        _: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
+        envelope: TypedEnvelope<proto::LspEvent>,
         _: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<()> {
-        this.update(&mut cx, |this, cx| this.disk_based_diagnostics_started(cx));
-        Ok(())
-    }
+        match envelope
+            .payload
+            .variant
+            .ok_or_else(|| anyhow!("invalid variant"))?
+        {
+            proto::lsp_event::Variant::WorkStart(payload) => this.update(&mut cx, |this, cx| {
+                this.on_lsp_work_start(payload.token, cx);
+            }),
+            proto::lsp_event::Variant::WorkProgress(payload) => this.update(&mut cx, |this, cx| {
+                this.on_lsp_work_progress(
+                    payload.token,
+                    payload.message,
+                    payload.percentage.map(|p| p as usize),
+                    cx,
+                );
+            }),
+            proto::lsp_event::Variant::WorkEnd(payload) => this.update(&mut cx, |this, cx| {
+                this.on_lsp_work_end(payload.token, cx);
+            }),
+            proto::lsp_event::Variant::DiskBasedDiagnosticsUpdating(_) => {
+                this.update(&mut cx, |this, cx| {
+                    this.disk_based_diagnostics_started(cx);
+                })
+            }
+            proto::lsp_event::Variant::DiskBasedDiagnosticsUpdated(_) => {
+                this.update(&mut cx, |this, cx| this.disk_based_diagnostics_finished(cx));
+            }
+        }
 
-    async fn handle_disk_based_diagnostics_updated(
-        this: ModelHandle<Self>,
-        _: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
-        _: Arc<Client>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        this.update(&mut cx, |this, cx| this.disk_based_diagnostics_finished(cx));
         Ok(())
     }
 

crates/rpc/proto/zed.proto 🔗

@@ -37,8 +37,7 @@ message Envelope {
         UnregisterWorktree unregister_worktree = 29;
         UpdateWorktree update_worktree = 31;
         UpdateDiagnosticSummary update_diagnostic_summary = 32;
-        DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 33;
-        DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 34;
+        LspEvent lsp_event = 33;
 
         OpenBuffer open_buffer = 35;
         OpenBufferResponse open_buffer_response = 36;
@@ -424,14 +423,35 @@ message DiagnosticSummary {
     uint32 hint_count = 5;
 }
 
-message DiskBasedDiagnosticsUpdating {
+message LspEvent {
     uint64 project_id = 1;
+    oneof variant {
+        LspWorkStart work_start = 2;
+        LspWorkProgress work_progress = 3;
+        LspWorkEnd work_end = 4;
+        LspDiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 5;
+        LspDiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 6;
+    }
 }
 
-message DiskBasedDiagnosticsUpdated {
-    uint64 project_id = 1;
+message LspWorkStart {
+    string token = 1;
+}
+
+message LspWorkProgress {
+    string token = 1;
+    optional string message = 2;
+    optional uint32 percentage = 3;
 }
 
+message LspWorkEnd {
+    string token = 1;
+}
+
+message LspDiskBasedDiagnosticsUpdating {}
+
+message LspDiskBasedDiagnosticsUpdated {}
+
 message GetChannels {}
 
 message GetChannelsResponse {

crates/rpc/src/proto.rs 🔗

@@ -146,8 +146,6 @@ messages!(
     (BufferReloaded, Foreground),
     (BufferSaved, Foreground),
     (ChannelMessageSent, Foreground),
-    (DiskBasedDiagnosticsUpdated, Background),
-    (DiskBasedDiagnosticsUpdating, Background),
     (Error, Foreground),
     (FormatBuffers, Foreground),
     (FormatBuffersResponse, Foreground),
@@ -175,6 +173,7 @@ messages!(
     (JoinProjectResponse, Foreground),
     (LeaveChannel, Foreground),
     (LeaveProject, Foreground),
+    (LspEvent, Background),
     (OpenBuffer, Background),
     (OpenBufferForSymbol, Background),
     (OpenBufferForSymbolResponse, Background),
@@ -246,8 +245,6 @@ entity_messages!(
     ApplyCompletionAdditionalEdits,
     BufferReloaded,
     BufferSaved,
-    DiskBasedDiagnosticsUpdated,
-    DiskBasedDiagnosticsUpdating,
     FormatBuffers,
     GetCodeActions,
     GetCompletions,
@@ -257,6 +254,7 @@ entity_messages!(
     GetProjectSymbols,
     JoinProject,
     LeaveProject,
+    LspEvent,
     OpenBuffer,
     OpenBufferForSymbol,
     PerformRename,

crates/server/src/rpc.rs 🔗

@@ -84,8 +84,7 @@ impl Server {
             .add_message_handler(Server::unregister_worktree)
             .add_request_handler(Server::update_worktree)
             .add_message_handler(Server::update_diagnostic_summary)
-            .add_message_handler(Server::disk_based_diagnostics_updating)
-            .add_message_handler(Server::disk_based_diagnostics_updated)
+            .add_message_handler(Server::lsp_event)
             .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
             .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
             .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
@@ -535,23 +534,9 @@ impl Server {
         Ok(())
     }
 
-    async fn disk_based_diagnostics_updating(
+    async fn lsp_event(
         self: Arc<Server>,
-        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
-    ) -> tide::Result<()> {
-        let receiver_ids = self
-            .state()
-            .project_connection_ids(request.payload.project_id, request.sender_id)?;
-        broadcast(request.sender_id, receiver_ids, |connection_id| {
-            self.peer
-                .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })?;
-        Ok(())
-    }
-
-    async fn disk_based_diagnostics_updated(
-        self: Arc<Server>,
-        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
+        request: TypedEnvelope<proto::LspEvent>,
     ) -> tide::Result<()> {
         let receiver_ids = self
             .state()