Add channel to notify project when languages are added

Isaac Clayton created

Change summary

crates/language/src/language.rs             |  8 ++++
crates/project/src/project.rs               | 36 ++++++++++++++++++++-
crates/project/src/project_tests.rs         | 39 +++++++++++++++++++++-
crates/zed/src/languages/language_plugin.rs |  1 
4 files changed, 80 insertions(+), 4 deletions(-)

Detailed changes

crates/language/src/language.rs 🔗

@@ -18,6 +18,7 @@ use gpui::{MutableAppContext, Task};
 use highlight_map::HighlightMap;
 use lazy_static::lazy_static;
 use parking_lot::{Mutex, RwLock};
+use postage::watch;
 use regex::Regex;
 use serde::{de, Deserialize, Deserializer};
 use serde_json::Value;
@@ -316,6 +317,7 @@ pub struct LanguageRegistry {
             Shared<BoxFuture<'static, Result<PathBuf, Arc<anyhow::Error>>>>,
         >,
     >,
+    subscription: RwLock<(watch::Sender<()>, watch::Receiver<()>)>,
 }
 
 impl LanguageRegistry {
@@ -328,6 +330,7 @@ impl LanguageRegistry {
             lsp_binary_statuses_rx,
             login_shell_env_loaded: login_shell_env_loaded.shared(),
             lsp_binary_paths: Default::default(),
+            subscription: RwLock::new(watch::channel()),
         }
     }
 
@@ -338,6 +341,11 @@ impl LanguageRegistry {
 
     pub fn add(&self, language: Arc<Language>) {
         self.languages.write().push(language.clone());
+        *self.subscription.write().0.borrow_mut() = ();
+    }
+
+    pub fn subscribe(&self) -> watch::Receiver<()> {
+        self.subscription.read().1.clone()
     }
 
     pub fn set_theme(&self, theme: &SyntaxTheme) {

crates/project/src/project.rs 🔗

@@ -125,6 +125,7 @@ pub struct Project {
     buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
     nonce: u128,
     initialized_persistent_state: bool,
+    _maintain_buffer_languages: Task<()>,
 }
 
 #[derive(Error, Debug)]
@@ -472,6 +473,7 @@ impl Project {
                 opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
                 client_subscriptions: Vec::new(),
                 _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
+                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
                 active_entry: None,
                 languages,
                 client,
@@ -549,6 +551,7 @@ impl Project {
                 loading_local_worktrees: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
+                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
                 languages,
                 user_store: user_store.clone(),
                 project_store,
@@ -2019,6 +2022,34 @@ impl Project {
             })
     }
 
+    fn maintain_buffer_languages(
+        languages: &LanguageRegistry,
+        cx: &mut ModelContext<Project>,
+    ) -> Task<()> {
+        let mut subscription = languages.subscribe();
+        cx.spawn_weak(|project, mut cx| async move {
+            while let Some(_) = subscription.next().await {
+                if let Some(project) = project.upgrade(&cx) {
+                    project.update(&mut cx, |project, cx| {
+                        let mut buffers_without_language = Vec::new();
+                        for buffer in project.opened_buffers.values() {
+                            if let Some(buffer) = buffer.upgrade(cx) {
+                                if buffer.read(cx).language().is_none() {
+                                    buffers_without_language.push(buffer);
+                                }
+                            }
+                        }
+
+                        for buffer in buffers_without_language {
+                            project.assign_language_to_buffer(&buffer, cx);
+                            project.register_buffer_with_language_server(&buffer, cx);
+                        }
+                    });
+                }
+            }
+        })
+    }
+
     fn assign_language_to_buffer(
         &mut self,
         buffer: &ModelHandle<Buffer>,
@@ -2089,9 +2120,10 @@ impl Project {
                                 move |params, mut cx| {
                                     if let Some(this) = this.upgrade(&cx) {
                                         this.update(&mut cx, |this, cx| {
-                                            this.on_lsp_diagnostics_published(
+                                            // TODO(isaac): remove block on
+                                            smol::block_on(this.on_lsp_diagnostics_published(
                                                 server_id, params, &adapter, cx,
-                                            );
+                                            ));
                                         });
                                     }
                                 }

crates/project/src/project_tests.rs 🔗

@@ -122,11 +122,46 @@ async fn test_managing_language_servers(cx: &mut gpui::TestAppContext) {
     .await;
 
     let project = Project::test(fs.clone(), ["/the-root".as_ref()], cx).await;
-    project.update(cx, |project, _| {
-        project.languages.add(Arc::new(rust_language));
+
+    // Open a buffer before languages have been added
+    let json_buffer = project
+        .update(cx, |project, cx| {
+            project.open_local_buffer("/the-root/package.json", cx)
+        })
+        .await
+        .unwrap();
+
+    // Assert that this buffer does not have a language
+    assert!(json_buffer.read_with(cx, |buffer, _| { buffer.language().is_none() }));
+
+    // Now we add the languages to the project, and subscribe to the watcher
+    project.update(cx, |project, cx| {
+        // Get a handle to the channel and clear out default item
+        let mut recv = project.languages.subscribe();
+        recv.blocking_recv();
+
+        // Add, then wait to be notified that JSON has been added
         project.languages.add(Arc::new(json_language));
+        recv.blocking_recv();
+
+        // Add, then wait to be notified that Rust has been added
+        project.languages.add(Arc::new(rust_language));
+        recv.blocking_recv();
+        // Uncommenting this would cause the thread to block indefinitely:
+        // recv.blocking_recv();
+
+        // Force the assignment, we know the watcher has been notified
+        // but have no way to wait for the watcher to assign to the project
+        project.assign_language_to_buffer(&json_buffer, cx);
     });
 
+    // Assert that the opened buffer does have a language, and that it is JSON
+    let name = json_buffer.read_with(cx, |buffer, _| buffer.language().map(|l| l.name()));
+    assert_eq!(name, Some("JSON".into()));
+
+    // Close the JSON buffer we opened
+    cx.update(|_| drop(json_buffer));
+
     // Open a buffer without an associated language server.
     let toml_buffer = project
         .update(cx, |project, cx| {

crates/zed/src/languages/language_plugin.rs 🔗

@@ -26,6 +26,7 @@ pub async fn new_json(executor: Arc<Background>) -> Result<PluginLspAdapter> {
             include_bytes!("../../../../plugins/bin/json_language.wasm.pre"),
         )
         .await?;
+    // smol::Timer::after(std::time::Duration::from_secs(5)).await;
     PluginLspAdapter::new(plugin, executor).await
 }