Remove logic for preserving RPC message order between peers

Max Brunsfeld and Antonio Scandurra created

* On the server, spawn a separate task for each incoming message
* In the peer, eliminate the barrier that was used to enforce ordering
  of responses with respect to other incoming messages

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/rpc/src/peer.rs   | 17 ++++-----------
crates/server/src/rpc.rs | 43 +++++++++++++++++++++++++++++++++--------
2 files changed, 39 insertions(+), 21 deletions(-)

Detailed changes

crates/rpc/src/peer.rs 🔗

@@ -5,7 +5,7 @@ use futures::stream::BoxStream;
 use futures::{FutureExt as _, StreamExt};
 use parking_lot::{Mutex, RwLock};
 use postage::{
-    barrier, mpsc,
+    mpsc,
     prelude::{Sink as _, Stream as _},
 };
 use smol_timeout::TimeoutExt as _;
@@ -91,8 +91,7 @@ pub struct Peer {
 pub struct ConnectionState {
     outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Envelope>,
     next_message_id: Arc<AtomicU32>,
-    response_channels:
-        Arc<Mutex<Option<HashMap<u32, mpsc::Sender<(proto::Envelope, barrier::Sender)>>>>>,
+    response_channels: Arc<Mutex<Option<HashMap<u32, mpsc::Sender<proto::Envelope>>>>>,
 }
 
 const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
@@ -178,18 +177,12 @@ impl Peer {
                 if let Some(responding_to) = incoming.responding_to {
                     let channel = response_channels.lock().as_mut()?.remove(&responding_to);
                     if let Some(mut tx) = channel {
-                        let mut requester_resumed = barrier::channel();
-                        if let Err(error) = tx.send((incoming, requester_resumed.0)).await {
+                        if let Err(error) = tx.send(incoming).await {
                             log::debug!(
                                 "received RPC but request future was dropped {:?}",
-                                error.0 .0
+                                error.0
                             );
                         }
-                        // Drop response channel before awaiting on the barrier. This allows the
-                        // barrier to get dropped even if the request's future is dropped before it
-                        // has a chance to observe the response.
-                        drop(tx);
-                        requester_resumed.1.recv().await;
                     } else {
                         log::warn!("received RPC response to unknown request {}", responding_to);
                     }
@@ -260,7 +253,7 @@ impl Peer {
         });
         async move {
             send?;
-            let (response, _barrier) = rx
+            let response = rx
                 .recv()
                 .await
                 .ok_or_else(|| anyhow!("connection was closed"))?;

crates/server/src/rpc.rs 🔗

@@ -41,6 +41,12 @@ pub struct Server {
     notifications: Option<mpsc::Sender<()>>,
 }
 
+pub trait Executor {
+    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
+}
+
+pub struct RealExecutor;
+
 const MESSAGE_COUNT_PER_PAGE: usize = 100;
 const MAX_MESSAGE_LEN: usize = 1024;
 
@@ -144,12 +150,13 @@ impl Server {
         })
     }
 
-    pub fn handle_connection(
+    pub fn handle_connection<E: Executor>(
         self: &Arc<Self>,
         connection: Connection,
         addr: String,
         user_id: UserId,
         mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
+        executor: E,
     ) -> impl Future<Output = ()> {
         let mut this = self.clone();
         async move {
@@ -183,12 +190,14 @@ impl Server {
                             let type_name = message.payload_type_name();
                             log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
                             if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
-                                if let Err(err) = (handler)(this.clone(), message).await {
-                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
-                                } else {
-                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
-                                }
-
+                                let handle_message = (handler)(this.clone(), message);
+                                executor.spawn_detached(async move {
+                                    if let Err(err) = handle_message.await {
+                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
+                                    } else {
+                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
+                                    }
+                                });
                                 if let Some(mut notifications) = this.notifications.clone() {
                                     let _ = notifications.send(()).await;
                                 }
@@ -966,6 +975,12 @@ impl Server {
     }
 }
 
+impl Executor for RealExecutor {
+    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
+        task::spawn(future);
+    }
+}
+
 fn broadcast<F>(
     sender_id: ConnectionId,
     receiver_ids: Vec<ConnectionId>,
@@ -1032,6 +1047,7 @@ pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
                             addr,
                             user_id,
                             None,
+                            RealExecutor,
                         )
                         .await;
                 }
@@ -1778,10 +1794,12 @@ mod tests {
         let buffer_b = cx_b
             .background()
             .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
-        task::yield_now().await;
 
         // Edit the buffer as client A while client B is still opening it.
-        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
+        cx_b.background().simulate_random_delay().await;
+        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
+        cx_b.background().simulate_random_delay().await;
+        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
 
         let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
         let buffer_b = buffer_b.await.unwrap();
@@ -3598,6 +3616,7 @@ mod tests {
                                     client_name,
                                     user_id,
                                     Some(connection_id_tx),
+                                    cx.background(),
                                 ))
                                 .detach();
                             Ok(client_conn)
@@ -3701,6 +3720,12 @@ mod tests {
         }
     }
 
+    impl Executor for Arc<gpui::executor::Background> {
+        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
+            self.spawn(future).detach();
+        }
+    }
+
     fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
         channel
             .messages()