Don't use a bounded channel for signaling that buffers have been opened

Antonio Scandurra created

Blocking the sender could halt deserialization for no reason if nobody
is consuming the notifications.

Change summary

crates/project/src/project.rs | 24 +++++++++++++-----------
crates/server/src/rpc.rs      |  7 ++++---
2 files changed, 17 insertions(+), 14 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -22,7 +22,7 @@ use language::{
 };
 use lsp::{DiagnosticSeverity, DocumentHighlightKind, LanguageServer};
 use lsp_command::*;
-use postage::{broadcast, prelude::Stream, sink::Sink, watch};
+use postage::watch;
 use rand::prelude::*;
 use search::SearchQuery;
 use sha2::{Digest, Sha256};
@@ -58,7 +58,7 @@ pub struct Project {
     collaborators: HashMap<PeerId, Collaborator>,
     subscriptions: Vec<client::Subscription>,
     language_servers_with_diagnostics_running: isize,
-    opened_buffer: broadcast::Sender<()>,
+    opened_buffer: (Rc<RefCell<watch::Sender<()>>>, watch::Receiver<()>),
     loading_buffers: HashMap<
         ProjectPath,
         postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
@@ -248,7 +248,7 @@ impl Project {
                 move |this, mut cx| {
                     async move {
                         let mut status = rpc.status();
-                        while let Some(status) = status.recv().await {
+                        while let Some(status) = status.next().await {
                             if let Some(this) = this.upgrade(&cx) {
                                 let remote_id = if let client::Status::Connected { .. } = status {
                                     let response = rpc.request(proto::RegisterProject {}).await?;
@@ -283,6 +283,7 @@ impl Project {
                 }
             });
 
+            let (opened_buffer_tx, opened_buffer_rx) = watch::channel();
             Self {
                 worktrees: Default::default(),
                 collaborators: Default::default(),
@@ -295,7 +296,7 @@ impl Project {
                     remote_id_rx,
                     _maintain_remote_id_task,
                 },
-                opened_buffer: broadcast::channel(1).0,
+                opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
                 subscriptions: Vec::new(),
                 active_entry: None,
                 languages,
@@ -336,11 +337,12 @@ impl Project {
             load_task.detach();
         }
 
+        let (opened_buffer_tx, opened_buffer_rx) = watch::channel();
         let this = cx.add_model(|cx| {
             let mut this = Self {
                 worktrees: Vec::new(),
                 loading_buffers: Default::default(),
-                opened_buffer: broadcast::channel(1).0,
+                opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
                 shared_buffers: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
@@ -464,7 +466,7 @@ impl Project {
                 if let Some(id) = id {
                     return id;
                 }
-                watch.recv().await;
+                watch.next().await;
             }
         }
     }
@@ -661,7 +663,7 @@ impl Project {
                         Err(error) => return Err(anyhow!("{}", error)),
                     }
                 }
-                loading_watch.recv().await;
+                loading_watch.next().await;
             }
         })
     }
@@ -3228,8 +3230,8 @@ impl Project {
     ) -> Task<Result<ModelHandle<Buffer>>> {
         let replica_id = self.replica_id();
 
-        let mut opened_buffer_tx = self.opened_buffer.clone();
-        let mut opened_buffer_rx = self.opened_buffer.subscribe();
+        let opened_buffer_tx = self.opened_buffer.0.clone();
+        let mut opened_buffer_rx = self.opened_buffer.1.clone();
         cx.spawn(|this, mut cx| async move {
             match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? {
                 proto::buffer::Variant::Id(id) => {
@@ -3245,7 +3247,7 @@ impl Project {
                             break buffer;
                         }
                         opened_buffer_rx
-                            .recv()
+                            .next()
                             .await
                             .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
                     };
@@ -3278,7 +3280,7 @@ impl Project {
                         this.register_buffer(&buffer, buffer_worktree.as_ref(), cx)
                     })?;
 
-                    let _ = opened_buffer_tx.send(()).await;
+                    *opened_buffer_tx.borrow_mut().borrow_mut() = ();
                     Ok(buffer)
                 }
             }

crates/server/src/rpc.rs 🔗

@@ -4371,7 +4371,7 @@ mod tests {
                 .read_with(guest_cx, |project, cx| {
                     assert!(
                         !project.has_buffered_operations(cx),
-                        "guest {} has buffered operations ",
+                        "guest {} has buffered operations",
                         guest_id,
                     );
                 });
@@ -4779,8 +4779,9 @@ mod tests {
                             } else {
                                 buffer.update(&mut cx, |buffer, cx| {
                                     log::info!(
-                                        "Host: updating buffer {:?}",
-                                        buffer.file().unwrap().full_path(cx)
+                                        "Host: updating buffer {:?} ({})",
+                                        buffer.file().unwrap().full_path(cx),
+                                        buffer.remote_id()
                                     );
                                     buffer.randomly_edit(&mut *rng.lock(), 5, cx)
                                 });