@@ -322,7 +322,7 @@ async fn apply_server_operation(
server: &mut TestServer,
clients: &mut Vec<(Rc<TestClient>, TestAppContext)>,
client_tasks: &mut Vec<Task<()>>,
- operation_channels: &mut Vec<futures::channel::mpsc::UnboundedSender<()>>,
+ operation_channels: &mut Vec<futures::channel::mpsc::UnboundedSender<usize>>,
next_entity_id: &mut usize,
plan: Arc<Mutex<TestPlan>>,
operation: Operation,
@@ -462,7 +462,11 @@ async fn apply_server_operation(
assert_eq!(stale_room_ids, vec![]);
}
- Operation::MutateClients { user_ids, quiesce } => {
+ Operation::MutateClients {
+ user_ids,
+ batch_id,
+ quiesce,
+ } => {
let mut applied = false;
for user_id in user_ids {
let client_ix = clients
@@ -470,7 +474,7 @@ async fn apply_server_operation(
.position(|(client, cx)| client.current_user_id(cx) == user_id);
let Some(client_ix) = client_ix else { continue };
applied = true;
- if let Err(err) = operation_channels[client_ix].unbounded_send(()) {
+ if let Err(err) = operation_channels[client_ix].unbounded_send(batch_id) {
// panic!("error signaling user {}, client {}", user_id, client_ix);
}
}
@@ -970,6 +974,7 @@ struct TestPlan {
max_operations: usize,
operation_ix: usize,
users: Vec<UserTestPlan>,
+ next_batch_id: usize,
allow_server_restarts: bool,
allow_client_reconnection: bool,
allow_client_disconnection: bool,
@@ -989,6 +994,7 @@ enum StoredOperation {
Server(Operation),
Client {
user_id: UserId,
+ batch_id: usize,
operation: ClientOperation,
},
}
@@ -1006,6 +1012,9 @@ enum Operation {
},
RestartServer,
MutateClients {
+ batch_id: usize,
+ #[serde(skip_serializing)]
+ #[serde(skip_deserializing)]
user_ids: Vec<UserId>,
quiesce: bool,
},
@@ -1103,6 +1112,7 @@ impl TestPlan {
allow_client_disconnection: rng.gen_bool(0.1),
stored_operations: Vec::new(),
operation_ix: 0,
+ next_batch_id: 0,
max_operations,
users,
rng,
@@ -1114,8 +1124,32 @@ impl TestPlan {
self.replay = true;
let stored_operations: Vec<StoredOperation> = serde_json::from_str(&json).unwrap();
self.stored_operations = stored_operations
- .into_iter()
- .map(|operation| (operation, Arc::new(AtomicBool::new(false))))
+ .iter()
+ .cloned()
+ .enumerate()
+ .map(|(i, mut operation)| {
+ if let StoredOperation::Server(Operation::MutateClients {
+ batch_id: current_batch_id,
+ user_ids,
+ ..
+ }) = &mut operation
+ {
+ assert!(user_ids.is_empty());
+ user_ids.extend(stored_operations[i + 1..].iter().filter_map(|operation| {
+ if let StoredOperation::Client {
+ user_id, batch_id, ..
+ } = operation
+ {
+ if batch_id == current_batch_id {
+ return Some(user_id);
+ }
+ }
+ None
+ }));
+ user_ids.sort_unstable();
+ }
+ (operation, Arc::new(AtomicBool::new(false)))
+ })
.collect()
}
@@ -1161,6 +1195,7 @@ impl TestPlan {
fn next_client_operation(
&mut self,
client: &TestClient,
+ current_batch_id: usize,
cx: &TestAppContext,
) -> Option<(ClientOperation, Arc<AtomicBool>)> {
let current_user_id = client.current_user_id(cx);
@@ -1174,7 +1209,12 @@ impl TestPlan {
if self.replay {
while let Some(stored_operation) = self.stored_operations.get(user_plan.operation_ix) {
user_plan.operation_ix += 1;
- if let (StoredOperation::Client { user_id, operation }, skipped) = stored_operation
+ if let (
+ StoredOperation::Client {
+ user_id, operation, ..
+ },
+ skipped,
+ ) = stored_operation
{
if user_id == ¤t_user_id {
return Some((operation.clone(), skipped.clone()));
@@ -1188,6 +1228,7 @@ impl TestPlan {
self.stored_operations.push((
StoredOperation::Client {
user_id: current_user_id,
+ batch_id: current_batch_id,
operation: operation.clone(),
},
skipped.clone(),
@@ -1239,15 +1280,18 @@ impl TestPlan {
.rng
.gen_range(1..10)
.min(self.max_operations - self.operation_ix);
- let user_ids = (0..count)
+ let batch_id = util::post_inc(&mut self.next_batch_id);
+ let mut user_ids = (0..count)
.map(|_| {
let ix = self.rng.gen_range(0..clients.len());
let (client, cx) = &clients[ix];
client.current_user_id(cx)
})
- .collect();
+ .collect::<Vec<_>>();
+ user_ids.sort_unstable();
Operation::MutateClients {
user_ids,
+ batch_id,
quiesce: self.rng.gen_bool(0.7),
}
}
@@ -1625,7 +1669,7 @@ impl TestPlan {
async fn simulate_client(
client: Rc<TestClient>,
- mut operation_rx: futures::channel::mpsc::UnboundedReceiver<()>,
+ mut operation_rx: futures::channel::mpsc::UnboundedReceiver<usize>,
plan: Arc<Mutex<TestPlan>>,
mut cx: TestAppContext,
) {
@@ -1740,8 +1784,8 @@ async fn simulate_client(
.await;
client.language_registry.add(Arc::new(language));
- while operation_rx.next().await.is_some() {
- let Some((operation, skipped)) = plan.lock().next_client_operation(&client, &cx) else { break };
+ while let Some(batch_id) = operation_rx.next().await {
+ let Some((operation, skipped)) = plan.lock().next_client_operation(&client, batch_id, &cx) else { break };
match apply_client_operation(&client, operation, &mut cx).await {
Err(error) => {
log::error!("{} error: {}", client.username, error);