From 400a2fce585827c713a89cee870c165e25c4243a Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 28 Feb 2022 15:26:10 +0100 Subject: [PATCH] Don't use a bounded channel for signaling that buffers have been opened Blocking the sender could halt deserialization for no reason if nobody is consuming the notifications. --- crates/project/src/project.rs | 24 +++++++++++++----------- crates/server/src/rpc.rs | 7 ++++--- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 04d83e63615a04a8e6f3b4342b6f9568a63549d0..0a8e0913c78292029d65e5da007fd4586550a183 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -22,7 +22,7 @@ use language::{ }; use lsp::{DiagnosticSeverity, DocumentHighlightKind, LanguageServer}; use lsp_command::*; -use postage::{broadcast, prelude::Stream, sink::Sink, watch}; +use postage::watch; use rand::prelude::*; use search::SearchQuery; use sha2::{Digest, Sha256}; @@ -58,7 +58,7 @@ pub struct Project { collaborators: HashMap, subscriptions: Vec, language_servers_with_diagnostics_running: isize, - opened_buffer: broadcast::Sender<()>, + opened_buffer: (Rc>>, watch::Receiver<()>), loading_buffers: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, @@ -248,7 +248,7 @@ impl Project { move |this, mut cx| { async move { let mut status = rpc.status(); - while let Some(status) = status.recv().await { + while let Some(status) = status.next().await { if let Some(this) = this.upgrade(&cx) { let remote_id = if let client::Status::Connected { .. } = status { let response = rpc.request(proto::RegisterProject {}).await?; @@ -283,6 +283,7 @@ impl Project { } }); + let (opened_buffer_tx, opened_buffer_rx) = watch::channel(); Self { worktrees: Default::default(), collaborators: Default::default(), @@ -295,7 +296,7 @@ impl Project { remote_id_rx, _maintain_remote_id_task, }, - opened_buffer: broadcast::channel(1).0, + opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx), subscriptions: Vec::new(), active_entry: None, languages, @@ -336,11 +337,12 @@ impl Project { load_task.detach(); } + let (opened_buffer_tx, opened_buffer_rx) = watch::channel(); let this = cx.add_model(|cx| { let mut this = Self { worktrees: Vec::new(), loading_buffers: Default::default(), - opened_buffer: broadcast::channel(1).0, + opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx), shared_buffers: Default::default(), active_entry: None, collaborators: Default::default(), @@ -464,7 +466,7 @@ impl Project { if let Some(id) = id { return id; } - watch.recv().await; + watch.next().await; } } } @@ -661,7 +663,7 @@ impl Project { Err(error) => return Err(anyhow!("{}", error)), } } - loading_watch.recv().await; + loading_watch.next().await; } }) } @@ -3228,8 +3230,8 @@ impl Project { ) -> Task>> { let replica_id = self.replica_id(); - let mut opened_buffer_tx = self.opened_buffer.clone(); - let mut opened_buffer_rx = self.opened_buffer.subscribe(); + let opened_buffer_tx = self.opened_buffer.0.clone(); + let mut opened_buffer_rx = self.opened_buffer.1.clone(); cx.spawn(|this, mut cx| async move { match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? { proto::buffer::Variant::Id(id) => { @@ -3245,7 +3247,7 @@ impl Project { break buffer; } opened_buffer_rx - .recv() + .next() .await .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?; }; @@ -3278,7 +3280,7 @@ impl Project { this.register_buffer(&buffer, buffer_worktree.as_ref(), cx) })?; - let _ = opened_buffer_tx.send(()).await; + *opened_buffer_tx.borrow_mut().borrow_mut() = (); Ok(buffer) } } diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 0b1f081d170c0a1b3a7b41ee3e03f5ccddad842a..17c67f3195f769fe76a3465d226c8177d13ef33e 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4371,7 +4371,7 @@ mod tests { .read_with(guest_cx, |project, cx| { assert!( !project.has_buffered_operations(cx), - "guest {} has buffered operations ", + "guest {} has buffered operations", guest_id, ); }); @@ -4779,8 +4779,9 @@ mod tests { } else { buffer.update(&mut cx, |buffer, cx| { log::info!( - "Host: updating buffer {:?}", - buffer.file().unwrap().full_path(cx) + "Host: updating buffer {:?} ({})", + buffer.file().unwrap().full_path(cx), + buffer.remote_id() ); buffer.randomly_edit(&mut *rng.lock(), 5, cx) });