Detailed changes
@@ -1377,290 +1377,275 @@ pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> {
Some((id, access_token.to_string()))
}
-// #[cfg(test)]
-// mod tests {
-// use super::*;
-// use crate::test::FakeServer;
-// use gpui::{executor::Deterministic, TestAppContext};
-// use parking_lot::Mutex;
-// use std::future;
-// use util::http::FakeHttpClient;
-
-// #[gpui::test(iterations = 10)]
-// async fn test_reconnection(cx: &mut TestAppContext) {
-// cx.foreground().forbid_parking();
-
-// let user_id = 5;
-// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-// let server = FakeServer::for_client(user_id, &client, cx).await;
-// let mut status = client.status();
-// assert!(matches!(
-// status.next().await,
-// Some(Status::Connected { .. })
-// ));
-// assert_eq!(server.auth_count(), 1);
-
-// server.forbid_connections();
-// server.disconnect();
-// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
-
-// server.allow_connections();
-// cx.foreground().advance_clock(Duration::from_secs(10));
-// while !matches!(status.next().await, Some(Status::Connected { .. })) {}
-// assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting
-
-// server.forbid_connections();
-// server.disconnect();
-// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
-
-// // Clear cached credentials after authentication fails
-// server.roll_access_token();
-// server.allow_connections();
-// cx.foreground().advance_clock(Duration::from_secs(10));
-// while !matches!(status.next().await, Some(Status::Connected { .. })) {}
-// assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
-// }
-
-// #[gpui::test(iterations = 10)]
-// async fn test_connection_timeout(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
-// deterministic.forbid_parking();
-
-// let user_id = 5;
-// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-// let mut status = client.status();
-
-// // Time out when client tries to connect.
-// client.override_authenticate(move |cx| {
-// cx.foreground().spawn(async move {
-// Ok(Credentials {
-// user_id,
-// access_token: "token".into(),
-// })
-// })
-// });
-// client.override_establish_connection(|_, cx| {
-// cx.foreground().spawn(async move {
-// future::pending::<()>().await;
-// unreachable!()
-// })
-// });
-// let auth_and_connect = cx.spawn({
-// let client = client.clone();
-// |cx| async move { client.authenticate_and_connect(false, &cx).await }
-// });
-// deterministic.run_until_parked();
-// assert!(matches!(status.next().await, Some(Status::Connecting)));
-
-// deterministic.advance_clock(CONNECTION_TIMEOUT);
-// assert!(matches!(
-// status.next().await,
-// Some(Status::ConnectionError { .. })
-// ));
-// auth_and_connect.await.unwrap_err();
-
-// // Allow the connection to be established.
-// let server = FakeServer::for_client(user_id, &client, cx).await;
-// assert!(matches!(
-// status.next().await,
-// Some(Status::Connected { .. })
-// ));
-
-// // Disconnect client.
-// server.forbid_connections();
-// server.disconnect();
-// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
-
-// // Time out when re-establishing the connection.
-// server.allow_connections();
-// client.override_establish_connection(|_, cx| {
-// cx.foreground().spawn(async move {
-// future::pending::<()>().await;
-// unreachable!()
-// })
-// });
-// deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY);
-// assert!(matches!(
-// status.next().await,
-// Some(Status::Reconnecting { .. })
-// ));
-
-// deterministic.advance_clock(CONNECTION_TIMEOUT);
-// assert!(matches!(
-// status.next().await,
-// Some(Status::ReconnectionError { .. })
-// ));
-// }
-
-// #[gpui::test(iterations = 10)]
-// async fn test_authenticating_more_than_once(
-// cx: &mut TestAppContext,
-// deterministic: Arc<Deterministic>,
-// ) {
-// cx.foreground().forbid_parking();
-
-// let auth_count = Arc::new(Mutex::new(0));
-// let dropped_auth_count = Arc::new(Mutex::new(0));
-// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-// client.override_authenticate({
-// let auth_count = auth_count.clone();
-// let dropped_auth_count = dropped_auth_count.clone();
-// move |cx| {
-// let auth_count = auth_count.clone();
-// let dropped_auth_count = dropped_auth_count.clone();
-// cx.foreground().spawn(async move {
-// *auth_count.lock() += 1;
-// let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
-// future::pending::<()>().await;
-// unreachable!()
-// })
-// }
-// });
-
-// let _authenticate = cx.spawn(|cx| {
-// let client = client.clone();
-// async move { client.authenticate_and_connect(false, &cx).await }
-// });
-// deterministic.run_until_parked();
-// assert_eq!(*auth_count.lock(), 1);
-// assert_eq!(*dropped_auth_count.lock(), 0);
-
-// let _authenticate = cx.spawn(|cx| {
-// let client = client.clone();
-// async move { client.authenticate_and_connect(false, &cx).await }
-// });
-// deterministic.run_until_parked();
-// assert_eq!(*auth_count.lock(), 2);
-// assert_eq!(*dropped_auth_count.lock(), 1);
-// }
-
-// #[test]
-// fn test_encode_and_decode_worktree_url() {
-// let url = encode_worktree_url(5, "deadbeef");
-// assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string())));
-// assert_eq!(
-// decode_worktree_url(&format!("\n {}\t", url)),
-// Some((5, "deadbeef".to_string()))
-// );
-// assert_eq!(decode_worktree_url("not://the-right-format"), None);
-// }
-
-// #[gpui::test]
-// async fn test_subscribing_to_entity(cx: &mut TestAppContext) {
-// cx.foreground().forbid_parking();
-
-// let user_id = 5;
-// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-// let server = FakeServer::for_client(user_id, &client, cx).await;
-
-// let (done_tx1, mut done_rx1) = smol::channel::unbounded();
-// let (done_tx2, mut done_rx2) = smol::channel::unbounded();
-// client.add_model_message_handler(
-// move |model: ModelHandle<Model>, _: TypedEnvelope<proto::JoinProject>, _, cx| {
-// match model.read_with(&cx, |model, _| model.id) {
-// 1 => done_tx1.try_send(()).unwrap(),
-// 2 => done_tx2.try_send(()).unwrap(),
-// _ => unreachable!(),
-// }
-// async { Ok(()) }
-// },
-// );
-// let model1 = cx.add_model(|_| Model {
-// id: 1,
-// subscription: None,
-// });
-// let model2 = cx.add_model(|_| Model {
-// id: 2,
-// subscription: None,
-// });
-// let model3 = cx.add_model(|_| Model {
-// id: 3,
-// subscription: None,
-// });
-
-// let _subscription1 = client
-// .subscribe_to_entity(1)
-// .unwrap()
-// .set_model(&model1, &mut cx.to_async());
-// let _subscription2 = client
-// .subscribe_to_entity(2)
-// .unwrap()
-// .set_model(&model2, &mut cx.to_async());
-// // Ensure dropping a subscription for the same entity type still allows receiving of
-// // messages for other entity IDs of the same type.
-// let subscription3 = client
-// .subscribe_to_entity(3)
-// .unwrap()
-// .set_model(&model3, &mut cx.to_async());
-// drop(subscription3);
-
-// server.send(proto::JoinProject { project_id: 1 });
-// server.send(proto::JoinProject { project_id: 2 });
-// done_rx1.next().await.unwrap();
-// done_rx2.next().await.unwrap();
-// }
-
-// #[gpui::test]
-// async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) {
-// cx.foreground().forbid_parking();
-
-// let user_id = 5;
-// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-// let server = FakeServer::for_client(user_id, &client, cx).await;
-
-// let model = cx.add_model(|_| Model::default());
-// let (done_tx1, _done_rx1) = smol::channel::unbounded();
-// let (done_tx2, mut done_rx2) = smol::channel::unbounded();
-// let subscription1 = client.add_message_handler(
-// model.clone(),
-// move |_, _: TypedEnvelope<proto::Ping>, _, _| {
-// done_tx1.try_send(()).unwrap();
-// async { Ok(()) }
-// },
-// );
-// drop(subscription1);
-// let _subscription2 = client.add_message_handler(
-// model.clone(),
-// move |_, _: TypedEnvelope<proto::Ping>, _, _| {
-// done_tx2.try_send(()).unwrap();
-// async { Ok(()) }
-// },
-// );
-// server.send(proto::Ping {});
-// done_rx2.next().await.unwrap();
-// }
-
-// #[gpui::test]
-// async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) {
-// cx.foreground().forbid_parking();
-
-// let user_id = 5;
-// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-// let server = FakeServer::for_client(user_id, &client, cx).await;
-
-// let model = cx.add_model(|_| Model::default());
-// let (done_tx, mut done_rx) = smol::channel::unbounded();
-// let subscription = client.add_message_handler(
-// model.clone(),
-// move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
-// model.update(&mut cx, |model, _| model.subscription.take());
-// done_tx.try_send(()).unwrap();
-// async { Ok(()) }
-// },
-// );
-// model.update(cx, |model, _| {
-// model.subscription = Some(subscription);
-// });
-// server.send(proto::Ping {});
-// done_rx.next().await.unwrap();
-// }
-
-// #[derive(Default)]
-// struct Model {
-// id: usize,
-// subscription: Option<Subscription>,
-// }
-
-// impl Entity for Model {
-// type Event = ();
-// }
-// }
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test::FakeServer;
+
+ use gpui2::{Context, Executor, TestAppContext};
+ use parking_lot::Mutex;
+ use std::future;
+ use util::http::FakeHttpClient;
+
+ #[gpui2::test(iterations = 10)]
+ async fn test_reconnection(cx: &mut TestAppContext) {
+ let user_id = 5;
+ let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+ let server = FakeServer::for_client(user_id, &client, cx).await;
+ let mut status = client.status();
+ assert!(matches!(
+ status.next().await,
+ Some(Status::Connected { .. })
+ ));
+ assert_eq!(server.auth_count(), 1);
+
+ server.forbid_connections();
+ server.disconnect();
+ while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
+
+ server.allow_connections();
+ cx.executor().advance_clock(Duration::from_secs(10));
+ while !matches!(status.next().await, Some(Status::Connected { .. })) {}
+ assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting
+
+ server.forbid_connections();
+ server.disconnect();
+ while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
+
+ // Clear cached credentials after authentication fails
+ server.roll_access_token();
+ server.allow_connections();
+ cx.executor().run_until_parked();
+ cx.executor().advance_clock(Duration::from_secs(10));
+ while !matches!(status.next().await, Some(Status::Connected { .. })) {}
+ assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
+ }
+
+ #[gpui2::test(iterations = 10)]
+ async fn test_connection_timeout(executor: Executor, cx: &mut TestAppContext) {
+ let user_id = 5;
+ let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+ let mut status = client.status();
+
+ // Time out when client tries to connect.
+ client.override_authenticate(move |cx| {
+ cx.executor().spawn(async move {
+ Ok(Credentials {
+ user_id,
+ access_token: "token".into(),
+ })
+ })
+ });
+ client.override_establish_connection(|_, cx| {
+ cx.executor().spawn(async move {
+ future::pending::<()>().await;
+ unreachable!()
+ })
+ });
+ let auth_and_connect = cx.spawn({
+ let client = client.clone();
+ |cx| async move { client.authenticate_and_connect(false, &cx).await }
+ });
+ executor.run_until_parked();
+ assert!(matches!(status.next().await, Some(Status::Connecting)));
+
+ executor.advance_clock(CONNECTION_TIMEOUT);
+ assert!(matches!(
+ status.next().await,
+ Some(Status::ConnectionError { .. })
+ ));
+ auth_and_connect.await.unwrap_err();
+
+ // Allow the connection to be established.
+ let server = FakeServer::for_client(user_id, &client, cx).await;
+ assert!(matches!(
+ status.next().await,
+ Some(Status::Connected { .. })
+ ));
+
+ // Disconnect client.
+ server.forbid_connections();
+ server.disconnect();
+ while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
+
+ // Time out when re-establishing the connection.
+ server.allow_connections();
+ client.override_establish_connection(|_, cx| {
+ cx.executor().spawn(async move {
+ future::pending::<()>().await;
+ unreachable!()
+ })
+ });
+ executor.advance_clock(2 * INITIAL_RECONNECTION_DELAY);
+ assert!(matches!(
+ status.next().await,
+ Some(Status::Reconnecting { .. })
+ ));
+
+ executor.advance_clock(CONNECTION_TIMEOUT);
+ assert!(matches!(
+ status.next().await,
+ Some(Status::ReconnectionError { .. })
+ ));
+ }
+
+ #[gpui2::test(iterations = 10)]
+ async fn test_authenticating_more_than_once(cx: &mut TestAppContext, executor: Executor) {
+ let auth_count = Arc::new(Mutex::new(0));
+ let dropped_auth_count = Arc::new(Mutex::new(0));
+ let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+ client.override_authenticate({
+ let auth_count = auth_count.clone();
+ let dropped_auth_count = dropped_auth_count.clone();
+ move |cx| {
+ let auth_count = auth_count.clone();
+ let dropped_auth_count = dropped_auth_count.clone();
+ cx.executor().spawn(async move {
+ *auth_count.lock() += 1;
+ let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
+ future::pending::<()>().await;
+ unreachable!()
+ })
+ }
+ });
+
+ let _authenticate = cx.spawn({
+ let client = client.clone();
+ move |cx| async move { client.authenticate_and_connect(false, &cx).await }
+ });
+ executor.run_until_parked();
+ assert_eq!(*auth_count.lock(), 1);
+ assert_eq!(*dropped_auth_count.lock(), 0);
+
+ let _authenticate = cx.spawn({
+ let client = client.clone();
+ |cx| async move { client.authenticate_and_connect(false, &cx).await }
+ });
+ executor.run_until_parked();
+ assert_eq!(*auth_count.lock(), 2);
+ assert_eq!(*dropped_auth_count.lock(), 1);
+ }
+
+ #[test]
+ fn test_encode_and_decode_worktree_url() {
+ let url = encode_worktree_url(5, "deadbeef");
+ assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string())));
+ assert_eq!(
+ decode_worktree_url(&format!("\n {}\t", url)),
+ Some((5, "deadbeef".to_string()))
+ );
+ assert_eq!(decode_worktree_url("not://the-right-format"), None);
+ }
+
+ #[gpui2::test]
+ async fn test_subscribing_to_entity(cx: &mut TestAppContext) {
+ let user_id = 5;
+ let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+ let server = FakeServer::for_client(user_id, &client, cx).await;
+
+ let (done_tx1, mut done_rx1) = smol::channel::unbounded();
+ let (done_tx2, mut done_rx2) = smol::channel::unbounded();
+ client.add_model_message_handler(
+ move |model: Handle<Model>, _: TypedEnvelope<proto::JoinProject>, _, mut cx| {
+ match model.update(&mut cx, |model, _| model.id).unwrap() {
+ 1 => done_tx1.try_send(()).unwrap(),
+ 2 => done_tx2.try_send(()).unwrap(),
+ _ => unreachable!(),
+ }
+ async { Ok(()) }
+ },
+ );
+ let model1 = cx.entity(|_| Model {
+ id: 1,
+ subscription: None,
+ });
+ let model2 = cx.entity(|_| Model {
+ id: 2,
+ subscription: None,
+ });
+ let model3 = cx.entity(|_| Model {
+ id: 3,
+ subscription: None,
+ });
+
+ let _subscription1 = client
+ .subscribe_to_entity(1)
+ .unwrap()
+ .set_model(&model1, &mut cx.to_async());
+ let _subscription2 = client
+ .subscribe_to_entity(2)
+ .unwrap()
+ .set_model(&model2, &mut cx.to_async());
+ // Ensure dropping a subscription for the same entity type still allows receiving of
+ // messages for other entity IDs of the same type.
+ let subscription3 = client
+ .subscribe_to_entity(3)
+ .unwrap()
+ .set_model(&model3, &mut cx.to_async());
+ drop(subscription3);
+
+ server.send(proto::JoinProject { project_id: 1 });
+ server.send(proto::JoinProject { project_id: 2 });
+ done_rx1.next().await.unwrap();
+ done_rx2.next().await.unwrap();
+ }
+
+ #[gpui2::test]
+ async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) {
+ let user_id = 5;
+ let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+ let server = FakeServer::for_client(user_id, &client, cx).await;
+
+ let model = cx.entity(|_| Model::default());
+ let (done_tx1, _done_rx1) = smol::channel::unbounded();
+ let (done_tx2, mut done_rx2) = smol::channel::unbounded();
+ let subscription1 = client.add_message_handler(
+ model.downgrade(),
+ move |_, _: TypedEnvelope<proto::Ping>, _, _| {
+ done_tx1.try_send(()).unwrap();
+ async { Ok(()) }
+ },
+ );
+ drop(subscription1);
+ let _subscription2 = client.add_message_handler(
+ model.downgrade(),
+ move |_, _: TypedEnvelope<proto::Ping>, _, _| {
+ done_tx2.try_send(()).unwrap();
+ async { Ok(()) }
+ },
+ );
+ server.send(proto::Ping {});
+ done_rx2.next().await.unwrap();
+ }
+
+ #[gpui2::test]
+ async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) {
+ let user_id = 5;
+ let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+ let server = FakeServer::for_client(user_id, &client, cx).await;
+
+ let model = cx.entity(|_| Model::default());
+ let (done_tx, mut done_rx) = smol::channel::unbounded();
+ let subscription = client.add_message_handler(
+ model.clone().downgrade(),
+ move |model: Handle<Model>, _: TypedEnvelope<proto::Ping>, _, mut cx| {
+ model
+ .update(&mut cx, |model, _| model.subscription.take())
+ .unwrap();
+ done_tx.try_send(()).unwrap();
+ async { Ok(()) }
+ },
+ );
+ model.update(cx, |model, _| {
+ model.subscription = Some(subscription);
+ });
+ server.send(proto::Ping {});
+ done_rx.next().await.unwrap();
+ }
+
+ #[derive(Default)]
+ struct Model {
+ id: usize,
+ subscription: Option<Subscription>,
+ }
+}
@@ -1,215 +1,216 @@
-// use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore};
-// use anyhow::{anyhow, Result};
-// use futures::{stream::BoxStream, StreamExt};
-// use gpui2::{Executor, Handle, TestAppContext};
-// use parking_lot::Mutex;
-// use rpc::{
-// proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse},
-// ConnectionId, Peer, Receipt, TypedEnvelope,
-// };
-// use std::{rc::Rc, sync::Arc};
-// use util::http::FakeHttpClient;
-
-// pub struct FakeServer {
-// peer: Arc<Peer>,
-// state: Arc<Mutex<FakeServerState>>,
-// user_id: u64,
-// executor: Executor,
-// }
-
-// #[derive(Default)]
-// struct FakeServerState {
-// incoming: Option<BoxStream<'static, Box<dyn proto::AnyTypedEnvelope>>>,
-// connection_id: Option<ConnectionId>,
-// forbid_connections: bool,
-// auth_count: usize,
-// access_token: usize,
-// }
-
-// impl FakeServer {
-// pub async fn for_client(
-// client_user_id: u64,
-// client: &Arc<Client>,
-// cx: &TestAppContext,
-// ) -> Self {
-// let server = Self {
-// peer: Peer::new(0),
-// state: Default::default(),
-// user_id: client_user_id,
-// executor: cx.foreground(),
-// };
-
-// client
-// .override_authenticate({
-// let state = Arc::downgrade(&server.state);
-// move |cx| {
-// let state = state.clone();
-// cx.spawn(move |_| async move {
-// let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
-// let mut state = state.lock();
-// state.auth_count += 1;
-// let access_token = state.access_token.to_string();
-// Ok(Credentials {
-// user_id: client_user_id,
-// access_token,
-// })
-// })
-// }
-// })
-// .override_establish_connection({
-// let peer = Arc::downgrade(&server.peer);
-// let state = Arc::downgrade(&server.state);
-// move |credentials, cx| {
-// let peer = peer.clone();
-// let state = state.clone();
-// let credentials = credentials.clone();
-// cx.spawn(move |cx| async move {
-// let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
-// let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
-// if state.lock().forbid_connections {
-// Err(EstablishConnectionError::Other(anyhow!(
-// "server is forbidding connections"
-// )))?
-// }
-
-// assert_eq!(credentials.user_id, client_user_id);
-
-// if credentials.access_token != state.lock().access_token.to_string() {
-// Err(EstablishConnectionError::Unauthorized)?
-// }
-
-// let (client_conn, server_conn, _) = Connection::in_memory(cx.background());
-// let (connection_id, io, incoming) =
-// peer.add_test_connection(server_conn, cx.background());
-// cx.background().spawn(io).detach();
-// {
-// let mut state = state.lock();
-// state.connection_id = Some(connection_id);
-// state.incoming = Some(incoming);
-// }
-// peer.send(
-// connection_id,
-// proto::Hello {
-// peer_id: Some(connection_id.into()),
-// },
-// )
-// .unwrap();
-
-// Ok(client_conn)
-// })
-// }
-// });
-
-// client
-// .authenticate_and_connect(false, &cx.to_async())
-// .await
-// .unwrap();
-
-// server
-// }
-
-// pub fn disconnect(&self) {
-// if self.state.lock().connection_id.is_some() {
-// self.peer.disconnect(self.connection_id());
-// let mut state = self.state.lock();
-// state.connection_id.take();
-// state.incoming.take();
-// }
-// }
-
-// pub fn auth_count(&self) -> usize {
-// self.state.lock().auth_count
-// }
-
-// pub fn roll_access_token(&self) {
-// self.state.lock().access_token += 1;
-// }
-
-// pub fn forbid_connections(&self) {
-// self.state.lock().forbid_connections = true;
-// }
-
-// pub fn allow_connections(&self) {
-// self.state.lock().forbid_connections = false;
-// }
-
-// pub fn send<T: proto::EnvelopedMessage>(&self, message: T) {
-// self.peer.send(self.connection_id(), message).unwrap();
-// }
-
-// #[allow(clippy::await_holding_lock)]
-// pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
-// self.executor.start_waiting();
-
-// loop {
-// let message = self
-// .state
-// .lock()
-// .incoming
-// .as_mut()
-// .expect("not connected")
-// .next()
-// .await
-// .ok_or_else(|| anyhow!("other half hung up"))?;
-// self.executor.finish_waiting();
-// let type_name = message.payload_type_name();
-// let message = message.into_any();
-
-// if message.is::<TypedEnvelope<M>>() {
-// return Ok(*message.downcast().unwrap());
-// }
-
-// if message.is::<TypedEnvelope<GetPrivateUserInfo>>() {
-// self.respond(
-// message
-// .downcast::<TypedEnvelope<GetPrivateUserInfo>>()
-// .unwrap()
-// .receipt(),
-// GetPrivateUserInfoResponse {
-// metrics_id: "the-metrics-id".into(),
-// staff: false,
-// flags: Default::default(),
-// },
-// );
-// continue;
-// }
-
-// panic!(
-// "fake server received unexpected message type: {:?}",
-// type_name
-// );
-// }
-// }
-
-// pub fn respond<T: proto::RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) {
-// self.peer.respond(receipt, response).unwrap()
-// }
-
-// fn connection_id(&self) -> ConnectionId {
-// self.state.lock().connection_id.expect("not connected")
-// }
-
-// pub async fn build_user_store(
-// &self,
-// client: Arc<Client>,
-// cx: &mut TestAppContext,
-// ) -> ModelHandle<UserStore> {
-// let http_client = FakeHttpClient::with_404_response();
-// let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx));
-// assert_eq!(
-// self.receive::<proto::GetUsers>()
-// .await
-// .unwrap()
-// .payload
-// .user_ids,
-// &[self.user_id]
-// );
-// user_store
-// }
-// }
-
-// impl Drop for FakeServer {
-// fn drop(&mut self) {
-// self.disconnect();
-// }
-// }
+use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore};
+use anyhow::{anyhow, Result};
+use futures::{stream::BoxStream, StreamExt};
+use gpui2::{Context, Executor, Handle, TestAppContext};
+use parking_lot::Mutex;
+use rpc2::{
+ proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse},
+ ConnectionId, Peer, Receipt, TypedEnvelope,
+};
+use std::sync::Arc;
+use util::http::FakeHttpClient;
+
+pub struct FakeServer {
+ peer: Arc<Peer>,
+ state: Arc<Mutex<FakeServerState>>,
+ user_id: u64,
+ executor: Executor,
+}
+
+#[derive(Default)]
+struct FakeServerState {
+ incoming: Option<BoxStream<'static, Box<dyn proto::AnyTypedEnvelope>>>,
+ connection_id: Option<ConnectionId>,
+ forbid_connections: bool,
+ auth_count: usize,
+ access_token: usize,
+}
+
+impl FakeServer {
+ pub async fn for_client(
+ client_user_id: u64,
+ client: &Arc<Client>,
+ cx: &TestAppContext,
+ ) -> Self {
+ let server = Self {
+ peer: Peer::new(0),
+ state: Default::default(),
+ user_id: client_user_id,
+ executor: cx.executor().clone(),
+ };
+
+ client
+ .override_authenticate({
+ let state = Arc::downgrade(&server.state);
+ move |cx| {
+ let state = state.clone();
+ cx.spawn(move |_| async move {
+ let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
+ let mut state = state.lock();
+ state.auth_count += 1;
+ let access_token = state.access_token.to_string();
+ Ok(Credentials {
+ user_id: client_user_id,
+ access_token,
+ })
+ })
+ }
+ })
+ .override_establish_connection({
+ let peer = Arc::downgrade(&server.peer);
+ let state = Arc::downgrade(&server.state);
+ move |credentials, cx| {
+ let peer = peer.clone();
+ let state = state.clone();
+ let credentials = credentials.clone();
+ cx.spawn(move |cx| async move {
+ let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
+ let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
+ if state.lock().forbid_connections {
+ Err(EstablishConnectionError::Other(anyhow!(
+ "server is forbidding connections"
+ )))?
+ }
+
+ assert_eq!(credentials.user_id, client_user_id);
+
+ if credentials.access_token != state.lock().access_token.to_string() {
+ Err(EstablishConnectionError::Unauthorized)?
+ }
+
+ let (client_conn, server_conn, _) =
+ Connection::in_memory(cx.executor().clone());
+ let (connection_id, io, incoming) =
+ peer.add_test_connection(server_conn, cx.executor().clone());
+ cx.executor().spawn(io).detach();
+ {
+ let mut state = state.lock();
+ state.connection_id = Some(connection_id);
+ state.incoming = Some(incoming);
+ }
+ peer.send(
+ connection_id,
+ proto::Hello {
+ peer_id: Some(connection_id.into()),
+ },
+ )
+ .unwrap();
+
+ Ok(client_conn)
+ })
+ }
+ });
+
+ client
+ .authenticate_and_connect(false, &cx.to_async())
+ .await
+ .unwrap();
+
+ server
+ }
+
+ pub fn disconnect(&self) {
+ if self.state.lock().connection_id.is_some() {
+ self.peer.disconnect(self.connection_id());
+ let mut state = self.state.lock();
+ state.connection_id.take();
+ state.incoming.take();
+ }
+ }
+
+ pub fn auth_count(&self) -> usize {
+ self.state.lock().auth_count
+ }
+
+ pub fn roll_access_token(&self) {
+ self.state.lock().access_token += 1;
+ }
+
+ pub fn forbid_connections(&self) {
+ self.state.lock().forbid_connections = true;
+ }
+
+ pub fn allow_connections(&self) {
+ self.state.lock().forbid_connections = false;
+ }
+
+ pub fn send<T: proto::EnvelopedMessage>(&self, message: T) {
+ self.peer.send(self.connection_id(), message).unwrap();
+ }
+
+ #[allow(clippy::await_holding_lock)]
+ pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
+ self.executor.start_waiting();
+
+ loop {
+ let message = self
+ .state
+ .lock()
+ .incoming
+ .as_mut()
+ .expect("not connected")
+ .next()
+ .await
+ .ok_or_else(|| anyhow!("other half hung up"))?;
+ self.executor.finish_waiting();
+ let type_name = message.payload_type_name();
+ let message = message.into_any();
+
+ if message.is::<TypedEnvelope<M>>() {
+ return Ok(*message.downcast().unwrap());
+ }
+
+ if message.is::<TypedEnvelope<GetPrivateUserInfo>>() {
+ self.respond(
+ message
+ .downcast::<TypedEnvelope<GetPrivateUserInfo>>()
+ .unwrap()
+ .receipt(),
+ GetPrivateUserInfoResponse {
+ metrics_id: "the-metrics-id".into(),
+ staff: false,
+ flags: Default::default(),
+ },
+ );
+ continue;
+ }
+
+ panic!(
+ "fake server received unexpected message type: {:?}",
+ type_name
+ );
+ }
+ }
+
+ pub fn respond<T: proto::RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) {
+ self.peer.respond(receipt, response).unwrap()
+ }
+
+ fn connection_id(&self) -> ConnectionId {
+ self.state.lock().connection_id.expect("not connected")
+ }
+
+ pub async fn build_user_store(
+ &self,
+ client: Arc<Client>,
+ cx: &mut TestAppContext,
+ ) -> Handle<UserStore> {
+ let http_client = FakeHttpClient::with_404_response();
+ let user_store = cx.entity(|cx| UserStore::new(client, http_client, cx));
+ assert_eq!(
+ self.receive::<proto::GetUsers>()
+ .await
+ .unwrap()
+ .payload
+ .user_ids,
+ &[self.user_id]
+ );
+ user_store
+ }
+}
+
+impl Drop for FakeServer {
+ fn drop(&mut self) {
+ self.disconnect();
+ }
+}
@@ -143,7 +143,7 @@ impl TestAppContext {
lock.update_global(update)
}
- fn to_async(&self) -> AsyncAppContext {
+ pub fn to_async(&self) -> AsyncAppContext {
AsyncAppContext {
app: Arc::downgrade(&self.app),
executor: self.executor.clone(),
@@ -146,7 +146,10 @@ impl Executor {
Poll::Ready(result) => return result,
Poll::Pending => {
if !self.dispatcher.poll() {
- // todo!("forbid_parking")
+ #[cfg(any(test, feature = "test-support"))]
+ if let Some(_) = self.dispatcher.as_test() {
+ panic!("blocked with nothing left to run")
+ }
parker.park();
}
}
@@ -206,11 +209,26 @@ impl Executor {
todo!("start_waiting")
}
+ #[cfg(any(test, feature = "test-support"))]
+ pub fn finish_waiting(&self) {
+ todo!("finish_waiting")
+ }
+
#[cfg(any(test, feature = "test-support"))]
pub fn simulate_random_delay(&self) -> impl Future<Output = ()> {
self.dispatcher.as_test().unwrap().simulate_random_delay()
}
+ #[cfg(any(test, feature = "test-support"))]
+ pub fn advance_clock(&self, duration: Duration) {
+ self.dispatcher.as_test().unwrap().advance_clock(duration)
+ }
+
+ #[cfg(any(test, feature = "test-support"))]
+ pub fn run_until_parked(&self) {
+ self.dispatcher.as_test().unwrap().run_until_parked()
+ }
+
pub fn num_cpus(&self) -> usize {
num_cpus::get()
}
@@ -8,7 +8,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
- time::{Duration, Instant},
+ time::Duration,
};
use util::post_inc;
@@ -24,8 +24,8 @@ struct TestDispatcherState {
random: StdRng,
foreground: HashMap<TestDispatcherId, VecDeque<Runnable>>,
background: Vec<Runnable>,
- delayed: Vec<(Instant, Runnable)>,
- time: Instant,
+ delayed: Vec<(Duration, Runnable)>,
+ time: Duration,
is_main_thread: bool,
next_id: TestDispatcherId,
}
@@ -37,7 +37,7 @@ impl TestDispatcher {
foreground: HashMap::default(),
background: Vec::new(),
delayed: Vec::new(),
- time: Instant::now(),
+ time: Duration::ZERO,
is_main_thread: true,
next_id: TestDispatcherId(1),
};
@@ -49,7 +49,21 @@ impl TestDispatcher {
}
pub fn advance_clock(&self, by: Duration) {
- self.state.lock().time += by;
+ let new_now = self.state.lock().time + by;
+ loop {
+ self.run_until_parked();
+ let state = self.state.lock();
+ let next_due_time = state.delayed.first().map(|(time, _)| *time);
+ drop(state);
+ if let Some(due_time) = next_due_time {
+ if due_time <= new_now {
+ self.state.lock().time = due_time;
+ continue;
+ }
+ }
+ break;
+ }
+ self.state.lock().time = new_now;
}
pub fn simulate_random_delay(&self) -> impl Future<Output = ()> {
@@ -137,10 +151,8 @@ impl PlatformDispatcher for TestDispatcher {
let background_len = state.background.len();
if foreground_len == 0 && background_len == 0 {
- eprintln!("no runnables to poll");
return false;
}
- eprintln!("runnables {} {}", foreground_len, background_len);
let main_thread = state.random.gen_ratio(
foreground_len as u32,
@@ -150,7 +162,6 @@ impl PlatformDispatcher for TestDispatcher {
state.is_main_thread = main_thread;
let runnable = if main_thread {
- eprintln!("running next main thread");
let state = &mut *state;
let runnables = state
.foreground
@@ -161,7 +172,6 @@ impl PlatformDispatcher for TestDispatcher {
runnables.pop_front().unwrap()
} else {
let ix = state.random.gen_range(0..background_len);
- eprintln!("running background thread {ix}");
state.background.swap_remove(ix)
};
@@ -173,14 +173,14 @@ impl Platform for TestPlatform {
}
fn write_credentials(&self, _url: &str, _username: &str, _password: &[u8]) -> Result<()> {
- unimplemented!()
+ Ok(())
}
fn read_credentials(&self, _url: &str) -> Result<Option<(String, Vec<u8>)>> {
- unimplemented!()
+ Ok(None)
}
fn delete_credentials(&self, _url: &str) -> Result<()> {
- unimplemented!()
+ Ok(())
}
}