diff --git a/Cargo.lock b/Cargo.lock index dda7116de2ffb6dbf8351ccb3ec0752c9e9f3838..8c3174d68d40df5c364bee8327318476b8dd0fa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3837,6 +3837,7 @@ dependencies = [ "async-tungstenite", "base64 0.13.0", "futures", + "gpui", "log", "parking_lot", "postage", diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index f16d7f39c2a1f4f82beba4b6a334402d781d61e9..4be612eec77ae902db19b04cd04dcd3b19adf527 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -30,5 +30,6 @@ zstd = "0.9" prost-build = "0.8" [dev-dependencies] +gpui = { path = "../gpui", features = ["test-support"] } smol = "1.2.5" tempdir = "0.3.7" diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 30d754e97de831e22514b60b89e5b473422628ee..ce9680173311ecb42dfef999c6fef7dee09e606f 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -342,201 +342,311 @@ mod tests { use super::*; use crate::TypedEnvelope; use async_tungstenite::tungstenite::Message as WebSocketMessage; + use gpui::TestAppContext; + + #[gpui::test(iterations = 10)] + async fn test_request_response(cx: TestAppContext) { + let executor = cx.foreground(); + + // create 2 clients connected to 1 server + let server = Peer::new(); + let client1 = Peer::new(); + let client2 = Peer::new(); + + let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory(); + let (client1_conn_id, io_task1, client1_incoming) = + client1.add_connection(client1_to_server_conn).await; + let (_, io_task2, server_incoming1) = server.add_connection(server_to_client_1_conn).await; + + let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory(); + let (client2_conn_id, io_task3, client2_incoming) = + client2.add_connection(client2_to_server_conn).await; + let (_, io_task4, server_incoming2) = server.add_connection(server_to_client_2_conn).await; + + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + executor.spawn(io_task3).detach(); + executor.spawn(io_task4).detach(); + executor + .spawn(handle_messages(server_incoming1, server.clone())) + .detach(); + executor + .spawn(handle_messages(client1_incoming, client1.clone())) + .detach(); + executor + .spawn(handle_messages(server_incoming2, server.clone())) + .detach(); + executor + .spawn(handle_messages(client2_incoming, client2.clone())) + .detach(); - #[test] - fn test_request_response() { - smol::block_on(async move { - // create 2 clients connected to 1 server - let server = Peer::new(); - let client1 = Peer::new(); - let client2 = Peer::new(); - - let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory(); - let (client1_conn_id, io_task1, client1_incoming) = - client1.add_connection(client1_to_server_conn).await; - let (_, io_task2, server_incoming1) = - server.add_connection(server_to_client_1_conn).await; - - let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory(); - let (client2_conn_id, io_task3, client2_incoming) = - client2.add_connection(client2_to_server_conn).await; - let (_, io_task4, server_incoming2) = - server.add_connection(server_to_client_2_conn).await; - - smol::spawn(io_task1).detach(); - smol::spawn(io_task2).detach(); - smol::spawn(io_task3).detach(); - smol::spawn(io_task4).detach(); - smol::spawn(handle_messages(server_incoming1, server.clone())).detach(); - smol::spawn(handle_messages(client1_incoming, client1.clone())).detach(); - smol::spawn(handle_messages(server_incoming2, server.clone())).detach(); - smol::spawn(handle_messages(client2_incoming, client2.clone())).detach(); - - assert_eq!( - client1 - .request(client1_conn_id, proto::Ping {},) - .await - .unwrap(), - proto::Ack {} - ); + assert_eq!( + client1 + .request(client1_conn_id, proto::Ping {},) + .await + .unwrap(), + proto::Ack {} + ); - assert_eq!( - client2 - .request(client2_conn_id, proto::Ping {},) + assert_eq!( + client2 + .request(client2_conn_id, proto::Ping {},) + .await + .unwrap(), + proto::Ack {} + ); + + assert_eq!( + client1 + .request( + client1_conn_id, + proto::OpenBuffer { + project_id: 0, + worktree_id: 1, + path: "path/one".to_string(), + }, + ) + .await + .unwrap(), + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 101, + visible_text: "path/one content".to_string(), + ..Default::default() + }), + } + ); + + assert_eq!( + client2 + .request( + client2_conn_id, + proto::OpenBuffer { + project_id: 0, + worktree_id: 2, + path: "path/two".to_string(), + }, + ) + .await + .unwrap(), + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 102, + visible_text: "path/two content".to_string(), + ..Default::default() + }), + } + ); + + client1.disconnect(client1_conn_id); + client2.disconnect(client1_conn_id); + + async fn handle_messages( + mut messages: BoxStream<'static, Box>, + peer: Arc, + ) -> Result<()> { + while let Some(envelope) = messages.next().await { + let envelope = envelope.into_any(); + if let Some(envelope) = envelope.downcast_ref::>() { + let receipt = envelope.receipt(); + peer.respond(receipt, proto::Ack {}).await? + } else if let Some(envelope) = + envelope.downcast_ref::>() + { + let message = &envelope.payload; + let receipt = envelope.receipt(); + let response = match message.path.as_str() { + "path/one" => { + assert_eq!(message.worktree_id, 1); + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 101, + visible_text: "path/one content".to_string(), + ..Default::default() + }), + } + } + "path/two" => { + assert_eq!(message.worktree_id, 2); + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 102, + visible_text: "path/two content".to_string(), + ..Default::default() + }), + } + } + _ => { + panic!("unexpected path {}", message.path); + } + }; + + peer.respond(receipt, response).await? + } else { + panic!("unknown message type"); + } + } + + Ok(()) + } + } + + #[gpui::test(iterations = 10)] + async fn test_order_of_response_and_incoming(cx: TestAppContext) { + let executor = cx.foreground(); + let server = Peer::new(); + let client = Peer::new(); + + let (client_to_server_conn, server_to_client_conn, _) = Connection::in_memory(); + let (client_to_server_conn_id, io_task1, mut client_incoming) = + client.add_connection(client_to_server_conn).await; + let (server_to_client_conn_id, io_task2, mut server_incoming) = + server.add_connection(server_to_client_conn).await; + + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + + executor + .spawn(async move { + let request = server_incoming + .next() .await - .unwrap(), - proto::Ack {} - ); - - assert_eq!( - client1 - .request( - client1_conn_id, - proto::OpenBuffer { - project_id: 0, - worktree_id: 1, - path: "path/one".to_string(), + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 1".to_string(), }, ) .await - .unwrap(), - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() - }), - } - ); - - assert_eq!( - client2 - .request( - client2_conn_id, - proto::OpenBuffer { - project_id: 0, - worktree_id: 2, - path: "path/two".to_string(), + .unwrap(); + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 2".to_string(), }, ) .await - .unwrap(), - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() - }), - } - ); - - client1.disconnect(client1_conn_id); - client2.disconnect(client1_conn_id); - - async fn handle_messages( - mut messages: BoxStream<'static, Box>, - peer: Arc, - ) -> Result<()> { - while let Some(envelope) = messages.next().await { - let envelope = envelope.into_any(); - if let Some(envelope) = envelope.downcast_ref::>() { - let receipt = envelope.receipt(); - peer.respond(receipt, proto::Ack {}).await? - } else if let Some(envelope) = - envelope.downcast_ref::>() - { - let message = &envelope.payload; - let receipt = envelope.receipt(); - let response = match message.path.as_str() { - "path/one" => { - assert_eq!(message.worktree_id, 1); - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() - }), - } - } - "path/two" => { - assert_eq!(message.worktree_id, 2); - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() - }), - } - } - _ => { - panic!("unexpected path {}", message.path); - } - }; + .unwrap(); + server + .respond(request.receipt(), proto::Ack {}) + .await + .unwrap(); - peer.respond(receipt, response).await? - } else { - panic!("unknown message type"); - } - } + // Prevent the connection from being dropped + server_incoming.next().await; + }) + .detach(); + + let events = Arc::new(Mutex::new(Vec::new())); - Ok(()) + let response = client.request(client_to_server_conn_id, proto::Ping {}); + let response_task = executor.spawn({ + let events = events.clone(); + async move { + response.await.unwrap(); + events.lock().push("response".to_string()); } }); + + executor + .spawn({ + let events = events.clone(); + async move { + let incoming1 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming1.payload.message); + let incoming2 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming2.payload.message); + + // Prevent the connection from being dropped + client_incoming.next().await; + } + }) + .detach(); + + response_task.await; + assert_eq!( + &*events.lock(), + &[ + "message 1".to_string(), + "message 2".to_string(), + "response".to_string() + ] + ); } - #[test] - fn test_disconnect() { - smol::block_on(async move { - let (client_conn, mut server_conn, _) = Connection::in_memory(); + #[gpui::test(iterations = 10)] + async fn test_disconnect(cx: TestAppContext) { + let executor = cx.foreground(); + + let (client_conn, mut server_conn, _) = Connection::in_memory(); - let client = Peer::new(); - let (connection_id, io_handler, mut incoming) = - client.add_connection(client_conn).await; + let client = Peer::new(); + let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; - let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); - smol::spawn(async move { + let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); + executor + .spawn(async move { io_handler.await.ok(); io_ended_tx.send(()).await.unwrap(); }) .detach(); - let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); - smol::spawn(async move { + let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); + executor + .spawn(async move { incoming.next().await; messages_ended_tx.send(()).await.unwrap(); }) .detach(); - client.disconnect(connection_id); + client.disconnect(connection_id); - io_ended_rx.recv().await; - messages_ended_rx.recv().await; - assert!(server_conn - .send(WebSocketMessage::Binary(vec![])) - .await - .is_err()); - }); + io_ended_rx.recv().await; + messages_ended_rx.recv().await; + assert!(server_conn + .send(WebSocketMessage::Binary(vec![])) + .await + .is_err()); } - #[test] - fn test_io_error() { - smol::block_on(async move { - let (client_conn, mut server_conn, _) = Connection::in_memory(); - - let client = Peer::new(); - let (connection_id, io_handler, mut incoming) = - client.add_connection(client_conn).await; - smol::spawn(io_handler).detach(); - smol::spawn(async move { incoming.next().await }).detach(); - - let response = smol::spawn(client.request(connection_id, proto::Ping {})); - let _request = server_conn.rx.next().await.unwrap().unwrap(); - - drop(server_conn); - assert_eq!( - response.await.unwrap_err().to_string(), - "connection was closed" - ); - }); + #[gpui::test(iterations = 10)] + async fn test_io_error(cx: TestAppContext) { + let executor = cx.foreground(); + let (client_conn, mut server_conn, _) = Connection::in_memory(); + + let client = Peer::new(); + let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; + executor.spawn(io_handler).detach(); + executor + .spawn(async move { incoming.next().await }) + .detach(); + + let response = executor.spawn(client.request(connection_id, proto::Ping {})); + let _request = server_conn.rx.next().await.unwrap().unwrap(); + + drop(server_conn); + assert_eq!( + response.await.unwrap_err().to_string(), + "connection was closed" + ); } }