From 69e5ecc01542052dffe22725e8b94ee69b4b9841 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 26 Oct 2023 14:43:28 +0200 Subject: [PATCH] Enable client tests * implement Executor::advance_clock Co-authored-by: Conrad Co-authored-by: Kyle Co-authored-by: Joseph --- crates/client2/src/client2.rs | 559 +++++++++---------- crates/client2/src/test.rs | 431 +++++++------- crates/gpui2/src/app/test_context.rs | 2 +- crates/gpui2/src/executor.rs | 20 +- crates/gpui2/src/platform/test/dispatcher.rs | 28 +- crates/gpui2/src/platform/test/platform.rs | 6 +- 6 files changed, 530 insertions(+), 516 deletions(-) diff --git a/crates/client2/src/client2.rs b/crates/client2/src/client2.rs index 215fcdf56732e41bc20a710d97e9b5387c49cde4..79b0205c9147018c7e033783a21f63051531a41e 100644 --- a/crates/client2/src/client2.rs +++ b/crates/client2/src/client2.rs @@ -1377,290 +1377,275 @@ pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> { Some((id, access_token.to_string())) } -// #[cfg(test)] -// mod tests { -// use super::*; -// use crate::test::FakeServer; -// use gpui::{executor::Deterministic, TestAppContext}; -// use parking_lot::Mutex; -// use std::future; -// use util::http::FakeHttpClient; - -// #[gpui::test(iterations = 10)] -// async fn test_reconnection(cx: &mut TestAppContext) { -// cx.foreground().forbid_parking(); - -// let user_id = 5; -// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); -// let server = FakeServer::for_client(user_id, &client, cx).await; -// let mut status = client.status(); -// assert!(matches!( -// status.next().await, -// Some(Status::Connected { .. }) -// )); -// assert_eq!(server.auth_count(), 1); - -// server.forbid_connections(); -// server.disconnect(); -// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} - -// server.allow_connections(); -// cx.foreground().advance_clock(Duration::from_secs(10)); -// while !matches!(status.next().await, Some(Status::Connected { .. })) {} -// assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting - -// server.forbid_connections(); -// server.disconnect(); -// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} - -// // Clear cached credentials after authentication fails -// server.roll_access_token(); -// server.allow_connections(); -// cx.foreground().advance_clock(Duration::from_secs(10)); -// while !matches!(status.next().await, Some(Status::Connected { .. })) {} -// assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token -// } - -// #[gpui::test(iterations = 10)] -// async fn test_connection_timeout(deterministic: Arc, cx: &mut TestAppContext) { -// deterministic.forbid_parking(); - -// let user_id = 5; -// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); -// let mut status = client.status(); - -// // Time out when client tries to connect. -// client.override_authenticate(move |cx| { -// cx.foreground().spawn(async move { -// Ok(Credentials { -// user_id, -// access_token: "token".into(), -// }) -// }) -// }); -// client.override_establish_connection(|_, cx| { -// cx.foreground().spawn(async move { -// future::pending::<()>().await; -// unreachable!() -// }) -// }); -// let auth_and_connect = cx.spawn({ -// let client = client.clone(); -// |cx| async move { client.authenticate_and_connect(false, &cx).await } -// }); -// deterministic.run_until_parked(); -// assert!(matches!(status.next().await, Some(Status::Connecting))); - -// deterministic.advance_clock(CONNECTION_TIMEOUT); -// assert!(matches!( -// status.next().await, -// Some(Status::ConnectionError { .. }) -// )); -// auth_and_connect.await.unwrap_err(); - -// // Allow the connection to be established. -// let server = FakeServer::for_client(user_id, &client, cx).await; -// assert!(matches!( -// status.next().await, -// Some(Status::Connected { .. }) -// )); - -// // Disconnect client. -// server.forbid_connections(); -// server.disconnect(); -// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} - -// // Time out when re-establishing the connection. -// server.allow_connections(); -// client.override_establish_connection(|_, cx| { -// cx.foreground().spawn(async move { -// future::pending::<()>().await; -// unreachable!() -// }) -// }); -// deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY); -// assert!(matches!( -// status.next().await, -// Some(Status::Reconnecting { .. }) -// )); - -// deterministic.advance_clock(CONNECTION_TIMEOUT); -// assert!(matches!( -// status.next().await, -// Some(Status::ReconnectionError { .. }) -// )); -// } - -// #[gpui::test(iterations = 10)] -// async fn test_authenticating_more_than_once( -// cx: &mut TestAppContext, -// deterministic: Arc, -// ) { -// cx.foreground().forbid_parking(); - -// let auth_count = Arc::new(Mutex::new(0)); -// let dropped_auth_count = Arc::new(Mutex::new(0)); -// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); -// client.override_authenticate({ -// let auth_count = auth_count.clone(); -// let dropped_auth_count = dropped_auth_count.clone(); -// move |cx| { -// let auth_count = auth_count.clone(); -// let dropped_auth_count = dropped_auth_count.clone(); -// cx.foreground().spawn(async move { -// *auth_count.lock() += 1; -// let _drop = util::defer(move || *dropped_auth_count.lock() += 1); -// future::pending::<()>().await; -// unreachable!() -// }) -// } -// }); - -// let _authenticate = cx.spawn(|cx| { -// let client = client.clone(); -// async move { client.authenticate_and_connect(false, &cx).await } -// }); -// deterministic.run_until_parked(); -// assert_eq!(*auth_count.lock(), 1); -// assert_eq!(*dropped_auth_count.lock(), 0); - -// let _authenticate = cx.spawn(|cx| { -// let client = client.clone(); -// async move { client.authenticate_and_connect(false, &cx).await } -// }); -// deterministic.run_until_parked(); -// assert_eq!(*auth_count.lock(), 2); -// assert_eq!(*dropped_auth_count.lock(), 1); -// } - -// #[test] -// fn test_encode_and_decode_worktree_url() { -// let url = encode_worktree_url(5, "deadbeef"); -// assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string()))); -// assert_eq!( -// decode_worktree_url(&format!("\n {}\t", url)), -// Some((5, "deadbeef".to_string())) -// ); -// assert_eq!(decode_worktree_url("not://the-right-format"), None); -// } - -// #[gpui::test] -// async fn test_subscribing_to_entity(cx: &mut TestAppContext) { -// cx.foreground().forbid_parking(); - -// let user_id = 5; -// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); -// let server = FakeServer::for_client(user_id, &client, cx).await; - -// let (done_tx1, mut done_rx1) = smol::channel::unbounded(); -// let (done_tx2, mut done_rx2) = smol::channel::unbounded(); -// client.add_model_message_handler( -// move |model: ModelHandle, _: TypedEnvelope, _, cx| { -// match model.read_with(&cx, |model, _| model.id) { -// 1 => done_tx1.try_send(()).unwrap(), -// 2 => done_tx2.try_send(()).unwrap(), -// _ => unreachable!(), -// } -// async { Ok(()) } -// }, -// ); -// let model1 = cx.add_model(|_| Model { -// id: 1, -// subscription: None, -// }); -// let model2 = cx.add_model(|_| Model { -// id: 2, -// subscription: None, -// }); -// let model3 = cx.add_model(|_| Model { -// id: 3, -// subscription: None, -// }); - -// let _subscription1 = client -// .subscribe_to_entity(1) -// .unwrap() -// .set_model(&model1, &mut cx.to_async()); -// let _subscription2 = client -// .subscribe_to_entity(2) -// .unwrap() -// .set_model(&model2, &mut cx.to_async()); -// // Ensure dropping a subscription for the same entity type still allows receiving of -// // messages for other entity IDs of the same type. -// let subscription3 = client -// .subscribe_to_entity(3) -// .unwrap() -// .set_model(&model3, &mut cx.to_async()); -// drop(subscription3); - -// server.send(proto::JoinProject { project_id: 1 }); -// server.send(proto::JoinProject { project_id: 2 }); -// done_rx1.next().await.unwrap(); -// done_rx2.next().await.unwrap(); -// } - -// #[gpui::test] -// async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) { -// cx.foreground().forbid_parking(); - -// let user_id = 5; -// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); -// let server = FakeServer::for_client(user_id, &client, cx).await; - -// let model = cx.add_model(|_| Model::default()); -// let (done_tx1, _done_rx1) = smol::channel::unbounded(); -// let (done_tx2, mut done_rx2) = smol::channel::unbounded(); -// let subscription1 = client.add_message_handler( -// model.clone(), -// move |_, _: TypedEnvelope, _, _| { -// done_tx1.try_send(()).unwrap(); -// async { Ok(()) } -// }, -// ); -// drop(subscription1); -// let _subscription2 = client.add_message_handler( -// model.clone(), -// move |_, _: TypedEnvelope, _, _| { -// done_tx2.try_send(()).unwrap(); -// async { Ok(()) } -// }, -// ); -// server.send(proto::Ping {}); -// done_rx2.next().await.unwrap(); -// } - -// #[gpui::test] -// async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) { -// cx.foreground().forbid_parking(); - -// let user_id = 5; -// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); -// let server = FakeServer::for_client(user_id, &client, cx).await; - -// let model = cx.add_model(|_| Model::default()); -// let (done_tx, mut done_rx) = smol::channel::unbounded(); -// let subscription = client.add_message_handler( -// model.clone(), -// move |model, _: TypedEnvelope, _, mut cx| { -// model.update(&mut cx, |model, _| model.subscription.take()); -// done_tx.try_send(()).unwrap(); -// async { Ok(()) } -// }, -// ); -// model.update(cx, |model, _| { -// model.subscription = Some(subscription); -// }); -// server.send(proto::Ping {}); -// done_rx.next().await.unwrap(); -// } - -// #[derive(Default)] -// struct Model { -// id: usize, -// subscription: Option, -// } - -// impl Entity for Model { -// type Event = (); -// } -// } +#[cfg(test)] +mod tests { + use super::*; + use crate::test::FakeServer; + + use gpui2::{Context, Executor, TestAppContext}; + use parking_lot::Mutex; + use std::future; + use util::http::FakeHttpClient; + + #[gpui2::test(iterations = 10)] + async fn test_reconnection(cx: &mut TestAppContext) { + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + let mut status = client.status(); + assert!(matches!( + status.next().await, + Some(Status::Connected { .. }) + )); + assert_eq!(server.auth_count(), 1); + + server.forbid_connections(); + server.disconnect(); + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} + + server.allow_connections(); + cx.executor().advance_clock(Duration::from_secs(10)); + while !matches!(status.next().await, Some(Status::Connected { .. })) {} + assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting + + server.forbid_connections(); + server.disconnect(); + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} + + // Clear cached credentials after authentication fails + server.roll_access_token(); + server.allow_connections(); + cx.executor().run_until_parked(); + cx.executor().advance_clock(Duration::from_secs(10)); + while !matches!(status.next().await, Some(Status::Connected { .. })) {} + assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token + } + + #[gpui2::test(iterations = 10)] + async fn test_connection_timeout(executor: Executor, cx: &mut TestAppContext) { + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let mut status = client.status(); + + // Time out when client tries to connect. + client.override_authenticate(move |cx| { + cx.executor().spawn(async move { + Ok(Credentials { + user_id, + access_token: "token".into(), + }) + }) + }); + client.override_establish_connection(|_, cx| { + cx.executor().spawn(async move { + future::pending::<()>().await; + unreachable!() + }) + }); + let auth_and_connect = cx.spawn({ + let client = client.clone(); + |cx| async move { client.authenticate_and_connect(false, &cx).await } + }); + executor.run_until_parked(); + assert!(matches!(status.next().await, Some(Status::Connecting))); + + executor.advance_clock(CONNECTION_TIMEOUT); + assert!(matches!( + status.next().await, + Some(Status::ConnectionError { .. }) + )); + auth_and_connect.await.unwrap_err(); + + // Allow the connection to be established. + let server = FakeServer::for_client(user_id, &client, cx).await; + assert!(matches!( + status.next().await, + Some(Status::Connected { .. }) + )); + + // Disconnect client. + server.forbid_connections(); + server.disconnect(); + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} + + // Time out when re-establishing the connection. + server.allow_connections(); + client.override_establish_connection(|_, cx| { + cx.executor().spawn(async move { + future::pending::<()>().await; + unreachable!() + }) + }); + executor.advance_clock(2 * INITIAL_RECONNECTION_DELAY); + assert!(matches!( + status.next().await, + Some(Status::Reconnecting { .. }) + )); + + executor.advance_clock(CONNECTION_TIMEOUT); + assert!(matches!( + status.next().await, + Some(Status::ReconnectionError { .. }) + )); + } + + #[gpui2::test(iterations = 10)] + async fn test_authenticating_more_than_once(cx: &mut TestAppContext, executor: Executor) { + let auth_count = Arc::new(Mutex::new(0)); + let dropped_auth_count = Arc::new(Mutex::new(0)); + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + client.override_authenticate({ + let auth_count = auth_count.clone(); + let dropped_auth_count = dropped_auth_count.clone(); + move |cx| { + let auth_count = auth_count.clone(); + let dropped_auth_count = dropped_auth_count.clone(); + cx.executor().spawn(async move { + *auth_count.lock() += 1; + let _drop = util::defer(move || *dropped_auth_count.lock() += 1); + future::pending::<()>().await; + unreachable!() + }) + } + }); + + let _authenticate = cx.spawn({ + let client = client.clone(); + move |cx| async move { client.authenticate_and_connect(false, &cx).await } + }); + executor.run_until_parked(); + assert_eq!(*auth_count.lock(), 1); + assert_eq!(*dropped_auth_count.lock(), 0); + + let _authenticate = cx.spawn({ + let client = client.clone(); + |cx| async move { client.authenticate_and_connect(false, &cx).await } + }); + executor.run_until_parked(); + assert_eq!(*auth_count.lock(), 2); + assert_eq!(*dropped_auth_count.lock(), 1); + } + + #[test] + fn test_encode_and_decode_worktree_url() { + let url = encode_worktree_url(5, "deadbeef"); + assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string()))); + assert_eq!( + decode_worktree_url(&format!("\n {}\t", url)), + Some((5, "deadbeef".to_string())) + ); + assert_eq!(decode_worktree_url("not://the-right-format"), None); + } + + #[gpui2::test] + async fn test_subscribing_to_entity(cx: &mut TestAppContext) { + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + + let (done_tx1, mut done_rx1) = smol::channel::unbounded(); + let (done_tx2, mut done_rx2) = smol::channel::unbounded(); + client.add_model_message_handler( + move |model: Handle, _: TypedEnvelope, _, mut cx| { + match model.update(&mut cx, |model, _| model.id).unwrap() { + 1 => done_tx1.try_send(()).unwrap(), + 2 => done_tx2.try_send(()).unwrap(), + _ => unreachable!(), + } + async { Ok(()) } + }, + ); + let model1 = cx.entity(|_| Model { + id: 1, + subscription: None, + }); + let model2 = cx.entity(|_| Model { + id: 2, + subscription: None, + }); + let model3 = cx.entity(|_| Model { + id: 3, + subscription: None, + }); + + let _subscription1 = client + .subscribe_to_entity(1) + .unwrap() + .set_model(&model1, &mut cx.to_async()); + let _subscription2 = client + .subscribe_to_entity(2) + .unwrap() + .set_model(&model2, &mut cx.to_async()); + // Ensure dropping a subscription for the same entity type still allows receiving of + // messages for other entity IDs of the same type. + let subscription3 = client + .subscribe_to_entity(3) + .unwrap() + .set_model(&model3, &mut cx.to_async()); + drop(subscription3); + + server.send(proto::JoinProject { project_id: 1 }); + server.send(proto::JoinProject { project_id: 2 }); + done_rx1.next().await.unwrap(); + done_rx2.next().await.unwrap(); + } + + #[gpui2::test] + async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) { + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + + let model = cx.entity(|_| Model::default()); + let (done_tx1, _done_rx1) = smol::channel::unbounded(); + let (done_tx2, mut done_rx2) = smol::channel::unbounded(); + let subscription1 = client.add_message_handler( + model.downgrade(), + move |_, _: TypedEnvelope, _, _| { + done_tx1.try_send(()).unwrap(); + async { Ok(()) } + }, + ); + drop(subscription1); + let _subscription2 = client.add_message_handler( + model.downgrade(), + move |_, _: TypedEnvelope, _, _| { + done_tx2.try_send(()).unwrap(); + async { Ok(()) } + }, + ); + server.send(proto::Ping {}); + done_rx2.next().await.unwrap(); + } + + #[gpui2::test] + async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) { + let user_id = 5; + let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + + let model = cx.entity(|_| Model::default()); + let (done_tx, mut done_rx) = smol::channel::unbounded(); + let subscription = client.add_message_handler( + model.clone().downgrade(), + move |model: Handle, _: TypedEnvelope, _, mut cx| { + model + .update(&mut cx, |model, _| model.subscription.take()) + .unwrap(); + done_tx.try_send(()).unwrap(); + async { Ok(()) } + }, + ); + model.update(cx, |model, _| { + model.subscription = Some(subscription); + }); + server.send(proto::Ping {}); + done_rx.next().await.unwrap(); + } + + #[derive(Default)] + struct Model { + id: usize, + subscription: Option, + } +} diff --git a/crates/client2/src/test.rs b/crates/client2/src/test.rs index 96c20791eccfad17c33aa06e7337903362c26b35..1b32d35092d04874672fc6775078e0e42b30f612 100644 --- a/crates/client2/src/test.rs +++ b/crates/client2/src/test.rs @@ -1,215 +1,216 @@ -// use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore}; -// use anyhow::{anyhow, Result}; -// use futures::{stream::BoxStream, StreamExt}; -// use gpui2::{Executor, Handle, TestAppContext}; -// use parking_lot::Mutex; -// use rpc::{ -// proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse}, -// ConnectionId, Peer, Receipt, TypedEnvelope, -// }; -// use std::{rc::Rc, sync::Arc}; -// use util::http::FakeHttpClient; - -// pub struct FakeServer { -// peer: Arc, -// state: Arc>, -// user_id: u64, -// executor: Executor, -// } - -// #[derive(Default)] -// struct FakeServerState { -// incoming: Option>>, -// connection_id: Option, -// forbid_connections: bool, -// auth_count: usize, -// access_token: usize, -// } - -// impl FakeServer { -// pub async fn for_client( -// client_user_id: u64, -// client: &Arc, -// cx: &TestAppContext, -// ) -> Self { -// let server = Self { -// peer: Peer::new(0), -// state: Default::default(), -// user_id: client_user_id, -// executor: cx.foreground(), -// }; - -// client -// .override_authenticate({ -// let state = Arc::downgrade(&server.state); -// move |cx| { -// let state = state.clone(); -// cx.spawn(move |_| async move { -// let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; -// let mut state = state.lock(); -// state.auth_count += 1; -// let access_token = state.access_token.to_string(); -// Ok(Credentials { -// user_id: client_user_id, -// access_token, -// }) -// }) -// } -// }) -// .override_establish_connection({ -// let peer = Arc::downgrade(&server.peer); -// let state = Arc::downgrade(&server.state); -// move |credentials, cx| { -// let peer = peer.clone(); -// let state = state.clone(); -// let credentials = credentials.clone(); -// cx.spawn(move |cx| async move { -// let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; -// let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?; -// if state.lock().forbid_connections { -// Err(EstablishConnectionError::Other(anyhow!( -// "server is forbidding connections" -// )))? -// } - -// assert_eq!(credentials.user_id, client_user_id); - -// if credentials.access_token != state.lock().access_token.to_string() { -// Err(EstablishConnectionError::Unauthorized)? -// } - -// let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); -// let (connection_id, io, incoming) = -// peer.add_test_connection(server_conn, cx.background()); -// cx.background().spawn(io).detach(); -// { -// let mut state = state.lock(); -// state.connection_id = Some(connection_id); -// state.incoming = Some(incoming); -// } -// peer.send( -// connection_id, -// proto::Hello { -// peer_id: Some(connection_id.into()), -// }, -// ) -// .unwrap(); - -// Ok(client_conn) -// }) -// } -// }); - -// client -// .authenticate_and_connect(false, &cx.to_async()) -// .await -// .unwrap(); - -// server -// } - -// pub fn disconnect(&self) { -// if self.state.lock().connection_id.is_some() { -// self.peer.disconnect(self.connection_id()); -// let mut state = self.state.lock(); -// state.connection_id.take(); -// state.incoming.take(); -// } -// } - -// pub fn auth_count(&self) -> usize { -// self.state.lock().auth_count -// } - -// pub fn roll_access_token(&self) { -// self.state.lock().access_token += 1; -// } - -// pub fn forbid_connections(&self) { -// self.state.lock().forbid_connections = true; -// } - -// pub fn allow_connections(&self) { -// self.state.lock().forbid_connections = false; -// } - -// pub fn send(&self, message: T) { -// self.peer.send(self.connection_id(), message).unwrap(); -// } - -// #[allow(clippy::await_holding_lock)] -// pub async fn receive(&self) -> Result> { -// self.executor.start_waiting(); - -// loop { -// let message = self -// .state -// .lock() -// .incoming -// .as_mut() -// .expect("not connected") -// .next() -// .await -// .ok_or_else(|| anyhow!("other half hung up"))?; -// self.executor.finish_waiting(); -// let type_name = message.payload_type_name(); -// let message = message.into_any(); - -// if message.is::>() { -// return Ok(*message.downcast().unwrap()); -// } - -// if message.is::>() { -// self.respond( -// message -// .downcast::>() -// .unwrap() -// .receipt(), -// GetPrivateUserInfoResponse { -// metrics_id: "the-metrics-id".into(), -// staff: false, -// flags: Default::default(), -// }, -// ); -// continue; -// } - -// panic!( -// "fake server received unexpected message type: {:?}", -// type_name -// ); -// } -// } - -// pub fn respond(&self, receipt: Receipt, response: T::Response) { -// self.peer.respond(receipt, response).unwrap() -// } - -// fn connection_id(&self) -> ConnectionId { -// self.state.lock().connection_id.expect("not connected") -// } - -// pub async fn build_user_store( -// &self, -// client: Arc, -// cx: &mut TestAppContext, -// ) -> ModelHandle { -// let http_client = FakeHttpClient::with_404_response(); -// let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); -// assert_eq!( -// self.receive::() -// .await -// .unwrap() -// .payload -// .user_ids, -// &[self.user_id] -// ); -// user_store -// } -// } - -// impl Drop for FakeServer { -// fn drop(&mut self) { -// self.disconnect(); -// } -// } +use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore}; +use anyhow::{anyhow, Result}; +use futures::{stream::BoxStream, StreamExt}; +use gpui2::{Context, Executor, Handle, TestAppContext}; +use parking_lot::Mutex; +use rpc2::{ + proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse}, + ConnectionId, Peer, Receipt, TypedEnvelope, +}; +use std::sync::Arc; +use util::http::FakeHttpClient; + +pub struct FakeServer { + peer: Arc, + state: Arc>, + user_id: u64, + executor: Executor, +} + +#[derive(Default)] +struct FakeServerState { + incoming: Option>>, + connection_id: Option, + forbid_connections: bool, + auth_count: usize, + access_token: usize, +} + +impl FakeServer { + pub async fn for_client( + client_user_id: u64, + client: &Arc, + cx: &TestAppContext, + ) -> Self { + let server = Self { + peer: Peer::new(0), + state: Default::default(), + user_id: client_user_id, + executor: cx.executor().clone(), + }; + + client + .override_authenticate({ + let state = Arc::downgrade(&server.state); + move |cx| { + let state = state.clone(); + cx.spawn(move |_| async move { + let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; + let mut state = state.lock(); + state.auth_count += 1; + let access_token = state.access_token.to_string(); + Ok(Credentials { + user_id: client_user_id, + access_token, + }) + }) + } + }) + .override_establish_connection({ + let peer = Arc::downgrade(&server.peer); + let state = Arc::downgrade(&server.state); + move |credentials, cx| { + let peer = peer.clone(); + let state = state.clone(); + let credentials = credentials.clone(); + cx.spawn(move |cx| async move { + let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; + let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?; + if state.lock().forbid_connections { + Err(EstablishConnectionError::Other(anyhow!( + "server is forbidding connections" + )))? + } + + assert_eq!(credentials.user_id, client_user_id); + + if credentials.access_token != state.lock().access_token.to_string() { + Err(EstablishConnectionError::Unauthorized)? + } + + let (client_conn, server_conn, _) = + Connection::in_memory(cx.executor().clone()); + let (connection_id, io, incoming) = + peer.add_test_connection(server_conn, cx.executor().clone()); + cx.executor().spawn(io).detach(); + { + let mut state = state.lock(); + state.connection_id = Some(connection_id); + state.incoming = Some(incoming); + } + peer.send( + connection_id, + proto::Hello { + peer_id: Some(connection_id.into()), + }, + ) + .unwrap(); + + Ok(client_conn) + }) + } + }); + + client + .authenticate_and_connect(false, &cx.to_async()) + .await + .unwrap(); + + server + } + + pub fn disconnect(&self) { + if self.state.lock().connection_id.is_some() { + self.peer.disconnect(self.connection_id()); + let mut state = self.state.lock(); + state.connection_id.take(); + state.incoming.take(); + } + } + + pub fn auth_count(&self) -> usize { + self.state.lock().auth_count + } + + pub fn roll_access_token(&self) { + self.state.lock().access_token += 1; + } + + pub fn forbid_connections(&self) { + self.state.lock().forbid_connections = true; + } + + pub fn allow_connections(&self) { + self.state.lock().forbid_connections = false; + } + + pub fn send(&self, message: T) { + self.peer.send(self.connection_id(), message).unwrap(); + } + + #[allow(clippy::await_holding_lock)] + pub async fn receive(&self) -> Result> { + self.executor.start_waiting(); + + loop { + let message = self + .state + .lock() + .incoming + .as_mut() + .expect("not connected") + .next() + .await + .ok_or_else(|| anyhow!("other half hung up"))?; + self.executor.finish_waiting(); + let type_name = message.payload_type_name(); + let message = message.into_any(); + + if message.is::>() { + return Ok(*message.downcast().unwrap()); + } + + if message.is::>() { + self.respond( + message + .downcast::>() + .unwrap() + .receipt(), + GetPrivateUserInfoResponse { + metrics_id: "the-metrics-id".into(), + staff: false, + flags: Default::default(), + }, + ); + continue; + } + + panic!( + "fake server received unexpected message type: {:?}", + type_name + ); + } + } + + pub fn respond(&self, receipt: Receipt, response: T::Response) { + self.peer.respond(receipt, response).unwrap() + } + + fn connection_id(&self) -> ConnectionId { + self.state.lock().connection_id.expect("not connected") + } + + pub async fn build_user_store( + &self, + client: Arc, + cx: &mut TestAppContext, + ) -> Handle { + let http_client = FakeHttpClient::with_404_response(); + let user_store = cx.entity(|cx| UserStore::new(client, http_client, cx)); + assert_eq!( + self.receive::() + .await + .unwrap() + .payload + .user_ids, + &[self.user_id] + ); + user_store + } +} + +impl Drop for FakeServer { + fn drop(&mut self) { + self.disconnect(); + } +} diff --git a/crates/gpui2/src/app/test_context.rs b/crates/gpui2/src/app/test_context.rs index 9133c31f52e6e557da3a6caa37e36492852fb75a..274121b692c526097b3080030233127d3a129749 100644 --- a/crates/gpui2/src/app/test_context.rs +++ b/crates/gpui2/src/app/test_context.rs @@ -143,7 +143,7 @@ impl TestAppContext { lock.update_global(update) } - fn to_async(&self) -> AsyncAppContext { + pub fn to_async(&self) -> AsyncAppContext { AsyncAppContext { app: Arc::downgrade(&self.app), executor: self.executor.clone(), diff --git a/crates/gpui2/src/executor.rs b/crates/gpui2/src/executor.rs index 261912b08576e64dc5f06c3772c388164b3d42cc..28d4b6d1173db01bfbb88856bd0194b33bc1be82 100644 --- a/crates/gpui2/src/executor.rs +++ b/crates/gpui2/src/executor.rs @@ -146,7 +146,10 @@ impl Executor { Poll::Ready(result) => return result, Poll::Pending => { if !self.dispatcher.poll() { - // todo!("forbid_parking") + #[cfg(any(test, feature = "test-support"))] + if let Some(_) = self.dispatcher.as_test() { + panic!("blocked with nothing left to run") + } parker.park(); } } @@ -206,11 +209,26 @@ impl Executor { todo!("start_waiting") } + #[cfg(any(test, feature = "test-support"))] + pub fn finish_waiting(&self) { + todo!("finish_waiting") + } + #[cfg(any(test, feature = "test-support"))] pub fn simulate_random_delay(&self) -> impl Future { self.dispatcher.as_test().unwrap().simulate_random_delay() } + #[cfg(any(test, feature = "test-support"))] + pub fn advance_clock(&self, duration: Duration) { + self.dispatcher.as_test().unwrap().advance_clock(duration) + } + + #[cfg(any(test, feature = "test-support"))] + pub fn run_until_parked(&self) { + self.dispatcher.as_test().unwrap().run_until_parked() + } + pub fn num_cpus(&self) -> usize { num_cpus::get() } diff --git a/crates/gpui2/src/platform/test/dispatcher.rs b/crates/gpui2/src/platform/test/dispatcher.rs index 02873f8e6e38e48bd0e74de4038f2d104a681692..0ed5638a8b411864ea9d1340309f32f4ac1791c0 100644 --- a/crates/gpui2/src/platform/test/dispatcher.rs +++ b/crates/gpui2/src/platform/test/dispatcher.rs @@ -8,7 +8,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; use util::post_inc; @@ -24,8 +24,8 @@ struct TestDispatcherState { random: StdRng, foreground: HashMap>, background: Vec, - delayed: Vec<(Instant, Runnable)>, - time: Instant, + delayed: Vec<(Duration, Runnable)>, + time: Duration, is_main_thread: bool, next_id: TestDispatcherId, } @@ -37,7 +37,7 @@ impl TestDispatcher { foreground: HashMap::default(), background: Vec::new(), delayed: Vec::new(), - time: Instant::now(), + time: Duration::ZERO, is_main_thread: true, next_id: TestDispatcherId(1), }; @@ -49,7 +49,21 @@ impl TestDispatcher { } pub fn advance_clock(&self, by: Duration) { - self.state.lock().time += by; + let new_now = self.state.lock().time + by; + loop { + self.run_until_parked(); + let state = self.state.lock(); + let next_due_time = state.delayed.first().map(|(time, _)| *time); + drop(state); + if let Some(due_time) = next_due_time { + if due_time <= new_now { + self.state.lock().time = due_time; + continue; + } + } + break; + } + self.state.lock().time = new_now; } pub fn simulate_random_delay(&self) -> impl Future { @@ -137,10 +151,8 @@ impl PlatformDispatcher for TestDispatcher { let background_len = state.background.len(); if foreground_len == 0 && background_len == 0 { - eprintln!("no runnables to poll"); return false; } - eprintln!("runnables {} {}", foreground_len, background_len); let main_thread = state.random.gen_ratio( foreground_len as u32, @@ -150,7 +162,6 @@ impl PlatformDispatcher for TestDispatcher { state.is_main_thread = main_thread; let runnable = if main_thread { - eprintln!("running next main thread"); let state = &mut *state; let runnables = state .foreground @@ -161,7 +172,6 @@ impl PlatformDispatcher for TestDispatcher { runnables.pop_front().unwrap() } else { let ix = state.random.gen_range(0..background_len); - eprintln!("running background thread {ix}"); state.background.swap_remove(ix) }; diff --git a/crates/gpui2/src/platform/test/platform.rs b/crates/gpui2/src/platform/test/platform.rs index 9cc7d959e0cf54119d52e8280ecd769b51c79d03..4d86c434d0e0dce5090036bfa34e6a29bac19415 100644 --- a/crates/gpui2/src/platform/test/platform.rs +++ b/crates/gpui2/src/platform/test/platform.rs @@ -173,14 +173,14 @@ impl Platform for TestPlatform { } fn write_credentials(&self, _url: &str, _username: &str, _password: &[u8]) -> Result<()> { - unimplemented!() + Ok(()) } fn read_credentials(&self, _url: &str) -> Result)>> { - unimplemented!() + Ok(None) } fn delete_credentials(&self, _url: &str) -> Result<()> { - unimplemented!() + Ok(()) } }