Lsp status bugfix (#2959)

Julia created

Release Notes:
- Fixed a case where language server download statuses could be skipped.
- Fixed a case where language server diagnostic progress could get stuck
when restarting a language server.

Change summary

crates/diagnostics/src/items.rs |  3 
crates/language/src/language.rs | 79 ++++++++++++++++++----------------
2 files changed, 44 insertions(+), 38 deletions(-)

Detailed changes

crates/diagnostics/src/items.rs 🔗

@@ -32,7 +32,8 @@ impl DiagnosticIndicator {
                 this.in_progress_checks.insert(*language_server_id);
                 cx.notify();
             }
-            project::Event::DiskBasedDiagnosticsFinished { language_server_id } => {
+            project::Event::DiskBasedDiagnosticsFinished { language_server_id }
+            | project::Event::LanguageServerRemoved(language_server_id) => {
                 this.summary = project.read(cx).diagnostic_summary(cx);
                 this.in_progress_checks.remove(language_server_id);
                 cx.notify();

crates/language/src/language.rs 🔗

@@ -13,7 +13,7 @@ use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
 use collections::{HashMap, HashSet};
 use futures::{
-    channel::oneshot,
+    channel::{mpsc, oneshot},
     future::{BoxFuture, Shared},
     FutureExt, TryFutureExt as _,
 };
@@ -48,9 +48,6 @@ use unicase::UniCase;
 use util::{http::HttpClient, paths::PathExt};
 use util::{post_inc, ResultExt, TryFutureExt as _, UnwrapFuture};
 
-#[cfg(any(test, feature = "test-support"))]
-use futures::channel::mpsc;
-
 pub use buffer::Operation;
 pub use buffer::*;
 pub use diagnostic_set::DiagnosticEntry;
@@ -64,6 +61,27 @@ pub fn init(cx: &mut AppContext) {
     language_settings::init(cx);
 }
 
+#[derive(Clone, Default)]
+struct LspBinaryStatusSender {
+    txs: Arc<Mutex<Vec<mpsc::UnboundedSender<(Arc<Language>, LanguageServerBinaryStatus)>>>>,
+}
+
+impl LspBinaryStatusSender {
+    fn subscribe(&self) -> mpsc::UnboundedReceiver<(Arc<Language>, LanguageServerBinaryStatus)> {
+        let (tx, rx) = mpsc::unbounded();
+        self.txs.lock().push(tx);
+        rx
+    }
+
+    fn send(&self, language: Arc<Language>, status: LanguageServerBinaryStatus) {
+        let mut txs = self.txs.lock();
+        txs.retain(|tx| {
+            tx.unbounded_send((language.clone(), status.clone()))
+                .is_ok()
+        });
+    }
+}
+
 thread_local! {
     static PARSER: RefCell<Parser> = RefCell::new(Parser::new());
 }
@@ -594,14 +612,13 @@ struct AvailableLanguage {
 pub struct LanguageRegistry {
     state: RwLock<LanguageRegistryState>,
     language_server_download_dir: Option<Arc<Path>>,
-    lsp_binary_statuses_tx: async_broadcast::Sender<(Arc<Language>, LanguageServerBinaryStatus)>,
-    lsp_binary_statuses_rx: async_broadcast::Receiver<(Arc<Language>, LanguageServerBinaryStatus)>,
     login_shell_env_loaded: Shared<Task<()>>,
     #[allow(clippy::type_complexity)]
     lsp_binary_paths: Mutex<
         HashMap<LanguageServerName, Shared<Task<Result<LanguageServerBinary, Arc<anyhow::Error>>>>>,
     >,
     executor: Option<Arc<Background>>,
+    lsp_binary_status_tx: LspBinaryStatusSender,
 }
 
 struct LanguageRegistryState {
@@ -624,7 +641,6 @@ pub struct PendingLanguageServer {
 
 impl LanguageRegistry {
     pub fn new(login_shell_env_loaded: Task<()>) -> Self {
-        let (lsp_binary_statuses_tx, lsp_binary_statuses_rx) = async_broadcast::broadcast(16);
         Self {
             state: RwLock::new(LanguageRegistryState {
                 next_language_server_id: 0,
@@ -638,11 +654,10 @@ impl LanguageRegistry {
                 reload_count: 0,
             }),
             language_server_download_dir: None,
-            lsp_binary_statuses_tx,
-            lsp_binary_statuses_rx,
             login_shell_env_loaded: login_shell_env_loaded.shared(),
             lsp_binary_paths: Default::default(),
             executor: None,
+            lsp_binary_status_tx: Default::default(),
         }
     }
 
@@ -918,8 +933,8 @@ impl LanguageRegistry {
         let container_dir: Arc<Path> = Arc::from(download_dir.join(adapter.name.0.as_ref()));
         let root_path = root_path.clone();
         let adapter = adapter.clone();
-        let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone();
         let login_shell_env_loaded = self.login_shell_env_loaded.clone();
+        let lsp_binary_statuses = self.lsp_binary_status_tx.clone();
 
         let task = {
             let container_dir = container_dir.clone();
@@ -976,8 +991,8 @@ impl LanguageRegistry {
 
     pub fn language_server_binary_statuses(
         &self,
-    ) -> async_broadcast::Receiver<(Arc<Language>, LanguageServerBinaryStatus)> {
-        self.lsp_binary_statuses_rx.clone()
+    ) -> mpsc::UnboundedReceiver<(Arc<Language>, LanguageServerBinaryStatus)> {
+        self.lsp_binary_status_tx.subscribe()
     }
 
     pub fn delete_server_container(
@@ -1054,7 +1069,7 @@ async fn get_binary(
     language: Arc<Language>,
     delegate: Arc<dyn LspAdapterDelegate>,
     container_dir: Arc<Path>,
-    statuses: async_broadcast::Sender<(Arc<Language>, LanguageServerBinaryStatus)>,
+    statuses: LspBinaryStatusSender,
     mut cx: AsyncAppContext,
 ) -> Result<LanguageServerBinary> {
     if !container_dir.exists() {
@@ -1081,19 +1096,15 @@ async fn get_binary(
             .cached_server_binary(container_dir.to_path_buf(), delegate.as_ref())
             .await
         {
-            statuses
-                .broadcast((language.clone(), LanguageServerBinaryStatus::Cached))
-                .await?;
+            statuses.send(language.clone(), LanguageServerBinaryStatus::Cached);
             return Ok(binary);
         } else {
-            statuses
-                .broadcast((
-                    language.clone(),
-                    LanguageServerBinaryStatus::Failed {
-                        error: format!("{:?}", error),
-                    },
-                ))
-                .await?;
+            statuses.send(
+                language.clone(),
+                LanguageServerBinaryStatus::Failed {
+                    error: format!("{:?}", error),
+                },
+            );
         }
     }
 
@@ -1105,27 +1116,21 @@ async fn fetch_latest_binary(
     language: Arc<Language>,
     delegate: &dyn LspAdapterDelegate,
     container_dir: &Path,
-    lsp_binary_statuses_tx: async_broadcast::Sender<(Arc<Language>, LanguageServerBinaryStatus)>,
+    lsp_binary_statuses_tx: LspBinaryStatusSender,
 ) -> Result<LanguageServerBinary> {
     let container_dir: Arc<Path> = container_dir.into();
-    lsp_binary_statuses_tx
-        .broadcast((
-            language.clone(),
-            LanguageServerBinaryStatus::CheckingForUpdate,
-        ))
-        .await?;
+    lsp_binary_statuses_tx.send(
+        language.clone(),
+        LanguageServerBinaryStatus::CheckingForUpdate,
+    );
 
     let version_info = adapter.fetch_latest_server_version(delegate).await?;
-    lsp_binary_statuses_tx
-        .broadcast((language.clone(), LanguageServerBinaryStatus::Downloading))
-        .await?;
+    lsp_binary_statuses_tx.send(language.clone(), LanguageServerBinaryStatus::Downloading);
 
     let binary = adapter
         .fetch_server_binary(version_info, container_dir.to_path_buf(), delegate)
         .await?;
-    lsp_binary_statuses_tx
-        .broadcast((language.clone(), LanguageServerBinaryStatus::Downloaded))
-        .await?;
+    lsp_binary_statuses_tx.send(language.clone(), LanguageServerBinaryStatus::Downloaded);
 
     Ok(binary)
 }