Test ordering of responses with respect to uni-directional messages

Nathan Sobo , Max Brunsfeld , and Antonio Scandurra created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>
Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

Cargo.lock             |   1 
crates/rpc/Cargo.toml  |   1 
crates/rpc/src/peer.rs | 442 +++++++++++++++++++++++++++----------------
3 files changed, 278 insertions(+), 166 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3837,6 +3837,7 @@ dependencies = [
  "async-tungstenite",
  "base64 0.13.0",
  "futures",
+ "gpui",
  "log",
  "parking_lot",
  "postage",

crates/rpc/Cargo.toml 🔗

@@ -30,5 +30,6 @@ zstd = "0.9"
 prost-build = "0.8"
 
 [dev-dependencies]
+gpui = { path = "../gpui", features = ["test-support"] }
 smol = "1.2.5"
 tempdir = "0.3.7"

crates/rpc/src/peer.rs 🔗

@@ -342,201 +342,311 @@ mod tests {
     use super::*;
     use crate::TypedEnvelope;
     use async_tungstenite::tungstenite::Message as WebSocketMessage;
+    use gpui::TestAppContext;
+
+    #[gpui::test(iterations = 10)]
+    async fn test_request_response(cx: TestAppContext) {
+        let executor = cx.foreground();
+
+        // create 2 clients connected to 1 server
+        let server = Peer::new();
+        let client1 = Peer::new();
+        let client2 = Peer::new();
+
+        let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory();
+        let (client1_conn_id, io_task1, client1_incoming) =
+            client1.add_connection(client1_to_server_conn).await;
+        let (_, io_task2, server_incoming1) = server.add_connection(server_to_client_1_conn).await;
+
+        let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory();
+        let (client2_conn_id, io_task3, client2_incoming) =
+            client2.add_connection(client2_to_server_conn).await;
+        let (_, io_task4, server_incoming2) = server.add_connection(server_to_client_2_conn).await;
+
+        executor.spawn(io_task1).detach();
+        executor.spawn(io_task2).detach();
+        executor.spawn(io_task3).detach();
+        executor.spawn(io_task4).detach();
+        executor
+            .spawn(handle_messages(server_incoming1, server.clone()))
+            .detach();
+        executor
+            .spawn(handle_messages(client1_incoming, client1.clone()))
+            .detach();
+        executor
+            .spawn(handle_messages(server_incoming2, server.clone()))
+            .detach();
+        executor
+            .spawn(handle_messages(client2_incoming, client2.clone()))
+            .detach();
 
-    #[test]
-    fn test_request_response() {
-        smol::block_on(async move {
-            // create 2 clients connected to 1 server
-            let server = Peer::new();
-            let client1 = Peer::new();
-            let client2 = Peer::new();
-
-            let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory();
-            let (client1_conn_id, io_task1, client1_incoming) =
-                client1.add_connection(client1_to_server_conn).await;
-            let (_, io_task2, server_incoming1) =
-                server.add_connection(server_to_client_1_conn).await;
-
-            let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory();
-            let (client2_conn_id, io_task3, client2_incoming) =
-                client2.add_connection(client2_to_server_conn).await;
-            let (_, io_task4, server_incoming2) =
-                server.add_connection(server_to_client_2_conn).await;
-
-            smol::spawn(io_task1).detach();
-            smol::spawn(io_task2).detach();
-            smol::spawn(io_task3).detach();
-            smol::spawn(io_task4).detach();
-            smol::spawn(handle_messages(server_incoming1, server.clone())).detach();
-            smol::spawn(handle_messages(client1_incoming, client1.clone())).detach();
-            smol::spawn(handle_messages(server_incoming2, server.clone())).detach();
-            smol::spawn(handle_messages(client2_incoming, client2.clone())).detach();
-
-            assert_eq!(
-                client1
-                    .request(client1_conn_id, proto::Ping {},)
-                    .await
-                    .unwrap(),
-                proto::Ack {}
-            );
+        assert_eq!(
+            client1
+                .request(client1_conn_id, proto::Ping {},)
+                .await
+                .unwrap(),
+            proto::Ack {}
+        );
 
-            assert_eq!(
-                client2
-                    .request(client2_conn_id, proto::Ping {},)
+        assert_eq!(
+            client2
+                .request(client2_conn_id, proto::Ping {},)
+                .await
+                .unwrap(),
+            proto::Ack {}
+        );
+
+        assert_eq!(
+            client1
+                .request(
+                    client1_conn_id,
+                    proto::OpenBuffer {
+                        project_id: 0,
+                        worktree_id: 1,
+                        path: "path/one".to_string(),
+                    },
+                )
+                .await
+                .unwrap(),
+            proto::OpenBufferResponse {
+                buffer: Some(proto::Buffer {
+                    id: 101,
+                    visible_text: "path/one content".to_string(),
+                    ..Default::default()
+                }),
+            }
+        );
+
+        assert_eq!(
+            client2
+                .request(
+                    client2_conn_id,
+                    proto::OpenBuffer {
+                        project_id: 0,
+                        worktree_id: 2,
+                        path: "path/two".to_string(),
+                    },
+                )
+                .await
+                .unwrap(),
+            proto::OpenBufferResponse {
+                buffer: Some(proto::Buffer {
+                    id: 102,
+                    visible_text: "path/two content".to_string(),
+                    ..Default::default()
+                }),
+            }
+        );
+
+        client1.disconnect(client1_conn_id);
+        client2.disconnect(client1_conn_id);
+
+        async fn handle_messages(
+            mut messages: BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
+            peer: Arc<Peer>,
+        ) -> Result<()> {
+            while let Some(envelope) = messages.next().await {
+                let envelope = envelope.into_any();
+                if let Some(envelope) = envelope.downcast_ref::<TypedEnvelope<proto::Ping>>() {
+                    let receipt = envelope.receipt();
+                    peer.respond(receipt, proto::Ack {}).await?
+                } else if let Some(envelope) =
+                    envelope.downcast_ref::<TypedEnvelope<proto::OpenBuffer>>()
+                {
+                    let message = &envelope.payload;
+                    let receipt = envelope.receipt();
+                    let response = match message.path.as_str() {
+                        "path/one" => {
+                            assert_eq!(message.worktree_id, 1);
+                            proto::OpenBufferResponse {
+                                buffer: Some(proto::Buffer {
+                                    id: 101,
+                                    visible_text: "path/one content".to_string(),
+                                    ..Default::default()
+                                }),
+                            }
+                        }
+                        "path/two" => {
+                            assert_eq!(message.worktree_id, 2);
+                            proto::OpenBufferResponse {
+                                buffer: Some(proto::Buffer {
+                                    id: 102,
+                                    visible_text: "path/two content".to_string(),
+                                    ..Default::default()
+                                }),
+                            }
+                        }
+                        _ => {
+                            panic!("unexpected path {}", message.path);
+                        }
+                    };
+
+                    peer.respond(receipt, response).await?
+                } else {
+                    panic!("unknown message type");
+                }
+            }
+
+            Ok(())
+        }
+    }
+
+    #[gpui::test(iterations = 10)]
+    async fn test_order_of_response_and_incoming(cx: TestAppContext) {
+        let executor = cx.foreground();
+        let server = Peer::new();
+        let client = Peer::new();
+
+        let (client_to_server_conn, server_to_client_conn, _) = Connection::in_memory();
+        let (client_to_server_conn_id, io_task1, mut client_incoming) =
+            client.add_connection(client_to_server_conn).await;
+        let (server_to_client_conn_id, io_task2, mut server_incoming) =
+            server.add_connection(server_to_client_conn).await;
+
+        executor.spawn(io_task1).detach();
+        executor.spawn(io_task2).detach();
+
+        executor
+            .spawn(async move {
+                let request = server_incoming
+                    .next()
                     .await
-                    .unwrap(),
-                proto::Ack {}
-            );
-
-            assert_eq!(
-                client1
-                    .request(
-                        client1_conn_id,
-                        proto::OpenBuffer {
-                            project_id: 0,
-                            worktree_id: 1,
-                            path: "path/one".to_string(),
+                    .unwrap()
+                    .into_any()
+                    .downcast::<TypedEnvelope<proto::Ping>>()
+                    .unwrap();
+
+                server
+                    .send(
+                        server_to_client_conn_id,
+                        proto::Error {
+                            message: "message 1".to_string(),
                         },
                     )
                     .await
-                    .unwrap(),
-                proto::OpenBufferResponse {
-                    buffer: Some(proto::Buffer {
-                        id: 101,
-                        visible_text: "path/one content".to_string(),
-                        ..Default::default()
-                    }),
-                }
-            );
-
-            assert_eq!(
-                client2
-                    .request(
-                        client2_conn_id,
-                        proto::OpenBuffer {
-                            project_id: 0,
-                            worktree_id: 2,
-                            path: "path/two".to_string(),
+                    .unwrap();
+                server
+                    .send(
+                        server_to_client_conn_id,
+                        proto::Error {
+                            message: "message 2".to_string(),
                         },
                     )
                     .await
-                    .unwrap(),
-                proto::OpenBufferResponse {
-                    buffer: Some(proto::Buffer {
-                        id: 102,
-                        visible_text: "path/two content".to_string(),
-                        ..Default::default()
-                    }),
-                }
-            );
-
-            client1.disconnect(client1_conn_id);
-            client2.disconnect(client1_conn_id);
-
-            async fn handle_messages(
-                mut messages: BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
-                peer: Arc<Peer>,
-            ) -> Result<()> {
-                while let Some(envelope) = messages.next().await {
-                    let envelope = envelope.into_any();
-                    if let Some(envelope) = envelope.downcast_ref::<TypedEnvelope<proto::Ping>>() {
-                        let receipt = envelope.receipt();
-                        peer.respond(receipt, proto::Ack {}).await?
-                    } else if let Some(envelope) =
-                        envelope.downcast_ref::<TypedEnvelope<proto::OpenBuffer>>()
-                    {
-                        let message = &envelope.payload;
-                        let receipt = envelope.receipt();
-                        let response = match message.path.as_str() {
-                            "path/one" => {
-                                assert_eq!(message.worktree_id, 1);
-                                proto::OpenBufferResponse {
-                                    buffer: Some(proto::Buffer {
-                                        id: 101,
-                                        visible_text: "path/one content".to_string(),
-                                        ..Default::default()
-                                    }),
-                                }
-                            }
-                            "path/two" => {
-                                assert_eq!(message.worktree_id, 2);
-                                proto::OpenBufferResponse {
-                                    buffer: Some(proto::Buffer {
-                                        id: 102,
-                                        visible_text: "path/two content".to_string(),
-                                        ..Default::default()
-                                    }),
-                                }
-                            }
-                            _ => {
-                                panic!("unexpected path {}", message.path);
-                            }
-                        };
+                    .unwrap();
+                server
+                    .respond(request.receipt(), proto::Ack {})
+                    .await
+                    .unwrap();
 
-                        peer.respond(receipt, response).await?
-                    } else {
-                        panic!("unknown message type");
-                    }
-                }
+                // Prevent the connection from being dropped
+                server_incoming.next().await;
+            })
+            .detach();
+
+        let events = Arc::new(Mutex::new(Vec::new()));
 
-                Ok(())
+        let response = client.request(client_to_server_conn_id, proto::Ping {});
+        let response_task = executor.spawn({
+            let events = events.clone();
+            async move {
+                response.await.unwrap();
+                events.lock().push("response".to_string());
             }
         });
+
+        executor
+            .spawn({
+                let events = events.clone();
+                async move {
+                    let incoming1 = client_incoming
+                        .next()
+                        .await
+                        .unwrap()
+                        .into_any()
+                        .downcast::<TypedEnvelope<proto::Error>>()
+                        .unwrap();
+                    events.lock().push(incoming1.payload.message);
+                    let incoming2 = client_incoming
+                        .next()
+                        .await
+                        .unwrap()
+                        .into_any()
+                        .downcast::<TypedEnvelope<proto::Error>>()
+                        .unwrap();
+                    events.lock().push(incoming2.payload.message);
+
+                    // Prevent the connection from being dropped
+                    client_incoming.next().await;
+                }
+            })
+            .detach();
+
+        response_task.await;
+        assert_eq!(
+            &*events.lock(),
+            &[
+                "message 1".to_string(),
+                "message 2".to_string(),
+                "response".to_string()
+            ]
+        );
     }
 
-    #[test]
-    fn test_disconnect() {
-        smol::block_on(async move {
-            let (client_conn, mut server_conn, _) = Connection::in_memory();
+    #[gpui::test(iterations = 10)]
+    async fn test_disconnect(cx: TestAppContext) {
+        let executor = cx.foreground();
+
+        let (client_conn, mut server_conn, _) = Connection::in_memory();
 
-            let client = Peer::new();
-            let (connection_id, io_handler, mut incoming) =
-                client.add_connection(client_conn).await;
+        let client = Peer::new();
+        let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await;
 
-            let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel();
-            smol::spawn(async move {
+        let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel();
+        executor
+            .spawn(async move {
                 io_handler.await.ok();
                 io_ended_tx.send(()).await.unwrap();
             })
             .detach();
 
-            let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel();
-            smol::spawn(async move {
+        let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel();
+        executor
+            .spawn(async move {
                 incoming.next().await;
                 messages_ended_tx.send(()).await.unwrap();
             })
             .detach();
 
-            client.disconnect(connection_id);
+        client.disconnect(connection_id);
 
-            io_ended_rx.recv().await;
-            messages_ended_rx.recv().await;
-            assert!(server_conn
-                .send(WebSocketMessage::Binary(vec![]))
-                .await
-                .is_err());
-        });
+        io_ended_rx.recv().await;
+        messages_ended_rx.recv().await;
+        assert!(server_conn
+            .send(WebSocketMessage::Binary(vec![]))
+            .await
+            .is_err());
     }
 
-    #[test]
-    fn test_io_error() {
-        smol::block_on(async move {
-            let (client_conn, mut server_conn, _) = Connection::in_memory();
-
-            let client = Peer::new();
-            let (connection_id, io_handler, mut incoming) =
-                client.add_connection(client_conn).await;
-            smol::spawn(io_handler).detach();
-            smol::spawn(async move { incoming.next().await }).detach();
-
-            let response = smol::spawn(client.request(connection_id, proto::Ping {}));
-            let _request = server_conn.rx.next().await.unwrap().unwrap();
-
-            drop(server_conn);
-            assert_eq!(
-                response.await.unwrap_err().to_string(),
-                "connection was closed"
-            );
-        });
+    #[gpui::test(iterations = 10)]
+    async fn test_io_error(cx: TestAppContext) {
+        let executor = cx.foreground();
+        let (client_conn, mut server_conn, _) = Connection::in_memory();
+
+        let client = Peer::new();
+        let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await;
+        executor.spawn(io_handler).detach();
+        executor
+            .spawn(async move { incoming.next().await })
+            .detach();
+
+        let response = executor.spawn(client.request(connection_id, proto::Ping {}));
+        let _request = server_conn.rx.next().await.unwrap().unwrap();
+
+        drop(server_conn);
+        assert_eq!(
+            response.await.unwrap_err().to_string(),
+            "connection was closed"
+        );
     }
 }