Handle Peer responses using a futures::oneshot instead of postage::mpsc

Max Brunsfeld created

Change summary

crates/rpc/src/peer.rs | 22 +++++++---------------
1 file changed, 7 insertions(+), 15 deletions(-)

Detailed changes

crates/rpc/src/peer.rs 🔗

@@ -1,8 +1,7 @@
 use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage};
 use super::Connection;
 use anyhow::{anyhow, Context, Result};
-use futures::stream::BoxStream;
-use futures::{FutureExt as _, StreamExt};
+use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt};
 use parking_lot::{Mutex, RwLock};
 use postage::{
     barrier, mpsc,
@@ -92,7 +91,7 @@ pub struct ConnectionState {
     outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Envelope>,
     next_message_id: Arc<AtomicU32>,
     response_channels:
-        Arc<Mutex<Option<HashMap<u32, mpsc::Sender<(proto::Envelope, barrier::Sender)>>>>>,
+        Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, barrier::Sender)>>>>>,
 }
 
 const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
@@ -177,18 +176,14 @@ impl Peer {
             async move {
                 if let Some(responding_to) = incoming.responding_to {
                     let channel = response_channels.lock().as_mut()?.remove(&responding_to);
-                    if let Some(mut tx) = channel {
+                    if let Some(tx) = channel {
                         let mut requester_resumed = barrier::channel();
-                        if let Err(error) = tx.send((incoming, requester_resumed.0)).await {
+                        if let Err(error) = tx.send((incoming, requester_resumed.0)) {
                             log::debug!(
                                 "received RPC but request future was dropped {:?}",
-                                error.0 .0
+                                error.0
                             );
                         }
-                        // Drop response channel before awaiting on the barrier. This allows the
-                        // barrier to get dropped even if the request's future is dropped before it
-                        // has a chance to observe the response.
-                        drop(tx);
                         requester_resumed.1.recv().await;
                     } else {
                         log::warn!("received RPC response to unknown request {}", responding_to);
@@ -239,7 +234,7 @@ impl Peer {
         receiver_id: ConnectionId,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
-        let (tx, mut rx) = mpsc::channel(1);
+        let (tx, rx) = oneshot::channel();
         let send = self.connection_state(receiver_id).and_then(|connection| {
             let message_id = connection.next_message_id.fetch_add(1, SeqCst);
             connection
@@ -260,10 +255,7 @@ impl Peer {
         });
         async move {
             send?;
-            let (response, _barrier) = rx
-                .recv()
-                .await
-                .ok_or_else(|| anyhow!("connection was closed"))?;
+            let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?;
             if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
                 Err(anyhow!("RPC request failed - {}", error.message))
             } else {