1use crate::{
2 db::{self, NewUserParams, UserId},
3 rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
4 tests::{TestClient, TestServer},
5};
6use async_trait::async_trait;
7use futures::StreamExt;
8use gpui::{executor::Deterministic, Task, TestAppContext};
9use parking_lot::Mutex;
10use rand::prelude::*;
11use rpc::RECEIVE_TIMEOUT;
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use settings::SettingsStore;
14use std::{
15 env,
16 path::PathBuf,
17 rc::Rc,
18 sync::{
19 atomic::{AtomicBool, Ordering::SeqCst},
20 Arc,
21 },
22};
23
24lazy_static::lazy_static! {
25 static ref PLAN_LOAD_PATH: Option<PathBuf> = path_env_var("LOAD_PLAN");
26 static ref PLAN_SAVE_PATH: Option<PathBuf> = path_env_var("SAVE_PLAN");
27 static ref MAX_PEERS: usize = env::var("MAX_PEERS")
28 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
29 .unwrap_or(3);
30 static ref MAX_OPERATIONS: usize = env::var("OPERATIONS")
31 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
32 .unwrap_or(10);
33
34}
35
36static LOADED_PLAN_JSON: Mutex<Option<Vec<u8>>> = Mutex::new(None);
37static LAST_PLAN: Mutex<Option<Box<dyn Send + FnOnce() -> Vec<u8>>>> = Mutex::new(None);
38
39struct TestPlan<T: RandomizedTest> {
40 rng: StdRng,
41 replay: bool,
42 stored_operations: Vec<(StoredOperation<T::Operation>, Arc<AtomicBool>)>,
43 max_operations: usize,
44 operation_ix: usize,
45 users: Vec<UserTestPlan>,
46 next_batch_id: usize,
47 allow_server_restarts: bool,
48 allow_client_reconnection: bool,
49 allow_client_disconnection: bool,
50}
51
52pub struct UserTestPlan {
53 pub user_id: UserId,
54 pub username: String,
55 pub allow_client_reconnection: bool,
56 pub allow_client_disconnection: bool,
57 next_root_id: usize,
58 operation_ix: usize,
59 online: bool,
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
63#[serde(untagged)]
64enum StoredOperation<T> {
65 Server(ServerOperation),
66 Client {
67 user_id: UserId,
68 batch_id: usize,
69 operation: T,
70 },
71}
72
73#[derive(Clone, Debug, Serialize, Deserialize)]
74enum ServerOperation {
75 AddConnection {
76 user_id: UserId,
77 },
78 RemoveConnection {
79 user_id: UserId,
80 },
81 BounceConnection {
82 user_id: UserId,
83 },
84 RestartServer,
85 MutateClients {
86 batch_id: usize,
87 #[serde(skip_serializing)]
88 #[serde(skip_deserializing)]
89 user_ids: Vec<UserId>,
90 quiesce: bool,
91 },
92}
93
94pub enum TestError {
95 Inapplicable,
96 Other(anyhow::Error),
97}
98
99#[async_trait(?Send)]
100pub trait RandomizedTest: 'static + Sized {
101 type Operation: Send + Clone + Serialize + DeserializeOwned;
102
103 fn generate_operation(
104 client: &TestClient,
105 rng: &mut StdRng,
106 plan: &mut UserTestPlan,
107 cx: &TestAppContext,
108 ) -> Self::Operation;
109
110 async fn apply_operation(
111 client: &TestClient,
112 operation: Self::Operation,
113 cx: &mut TestAppContext,
114 ) -> Result<(), TestError>;
115
116 async fn initialize(server: &mut TestServer, users: &[UserTestPlan]);
117
118 async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext);
119
120 async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc<TestClient>, TestAppContext)]);
121}
122
123pub async fn run_randomized_test<T: RandomizedTest>(
124 cx: &mut TestAppContext,
125 deterministic: Arc<Deterministic>,
126 rng: StdRng,
127) {
128 deterministic.forbid_parking();
129 let mut server = TestServer::start(&deterministic).await;
130 let plan = TestPlan::<T>::new(&mut server, rng).await;
131
132 LAST_PLAN.lock().replace({
133 let plan = plan.clone();
134 Box::new(move || plan.lock().serialize())
135 });
136
137 let mut clients = Vec::new();
138 let mut client_tasks = Vec::new();
139 let mut operation_channels = Vec::new();
140 loop {
141 let Some((next_operation, applied)) = plan.lock().next_server_operation(&clients) else {
142 break;
143 };
144 applied.store(true, SeqCst);
145 let did_apply = TestPlan::apply_server_operation(
146 plan.clone(),
147 deterministic.clone(),
148 &mut server,
149 &mut clients,
150 &mut client_tasks,
151 &mut operation_channels,
152 next_operation,
153 cx,
154 )
155 .await;
156 if !did_apply {
157 applied.store(false, SeqCst);
158 }
159 }
160
161 drop(operation_channels);
162 deterministic.start_waiting();
163 futures::future::join_all(client_tasks).await;
164 deterministic.finish_waiting();
165
166 deterministic.run_until_parked();
167 T::on_quiesce(&mut server, &mut clients).await;
168
169 for (client, mut cx) in clients {
170 cx.update(|cx| {
171 let store = cx.remove_global::<SettingsStore>();
172 cx.clear_globals();
173 cx.set_global(store);
174 drop(client);
175 });
176 }
177 deterministic.run_until_parked();
178
179 if let Some(path) = &*PLAN_SAVE_PATH {
180 eprintln!("saved test plan to path {:?}", path);
181 std::fs::write(path, plan.lock().serialize()).unwrap();
182 }
183}
184
185pub fn save_randomized_test_plan() {
186 if let Some(serialize_plan) = LAST_PLAN.lock().take() {
187 if let Some(path) = &*PLAN_SAVE_PATH {
188 eprintln!("saved test plan to path {:?}", path);
189 std::fs::write(path, serialize_plan()).unwrap();
190 }
191 }
192}
193
194impl<T: RandomizedTest> TestPlan<T> {
195 pub async fn new(server: &mut TestServer, mut rng: StdRng) -> Arc<Mutex<Self>> {
196 let allow_server_restarts = rng.gen_bool(0.7);
197 let allow_client_reconnection = rng.gen_bool(0.7);
198 let allow_client_disconnection = rng.gen_bool(0.1);
199
200 let mut users = Vec::new();
201 for ix in 0..*MAX_PEERS {
202 let username = format!("user-{}", ix + 1);
203 let user_id = server
204 .app_state
205 .db
206 .create_user(
207 &format!("{username}@example.com"),
208 false,
209 NewUserParams {
210 github_login: username.clone(),
211 github_user_id: ix as i32,
212 },
213 )
214 .await
215 .unwrap()
216 .user_id;
217 users.push(UserTestPlan {
218 user_id,
219 username,
220 online: false,
221 next_root_id: 0,
222 operation_ix: 0,
223 allow_client_disconnection,
224 allow_client_reconnection,
225 });
226 }
227
228 T::initialize(server, &users).await;
229
230 let plan = Arc::new(Mutex::new(Self {
231 replay: false,
232 allow_server_restarts,
233 allow_client_reconnection,
234 allow_client_disconnection,
235 stored_operations: Vec::new(),
236 operation_ix: 0,
237 next_batch_id: 0,
238 max_operations: *MAX_OPERATIONS,
239 users,
240 rng,
241 }));
242
243 if let Some(path) = &*PLAN_LOAD_PATH {
244 let json = LOADED_PLAN_JSON
245 .lock()
246 .get_or_insert_with(|| {
247 eprintln!("loaded test plan from path {:?}", path);
248 std::fs::read(path).unwrap()
249 })
250 .clone();
251 plan.lock().deserialize(json);
252 }
253
254 plan
255 }
256
257 fn deserialize(&mut self, json: Vec<u8>) {
258 let stored_operations: Vec<StoredOperation<T::Operation>> =
259 serde_json::from_slice(&json).unwrap();
260 self.replay = true;
261 self.stored_operations = stored_operations
262 .iter()
263 .cloned()
264 .enumerate()
265 .map(|(i, mut operation)| {
266 let did_apply = Arc::new(AtomicBool::new(false));
267 if let StoredOperation::Server(ServerOperation::MutateClients {
268 batch_id: current_batch_id,
269 user_ids,
270 ..
271 }) = &mut operation
272 {
273 assert!(user_ids.is_empty());
274 user_ids.extend(stored_operations[i + 1..].iter().filter_map(|operation| {
275 if let StoredOperation::Client {
276 user_id, batch_id, ..
277 } = operation
278 {
279 if batch_id == current_batch_id {
280 return Some(user_id);
281 }
282 }
283 None
284 }));
285 user_ids.sort_unstable();
286 }
287 (operation, did_apply)
288 })
289 .collect()
290 }
291
292 fn serialize(&mut self) -> Vec<u8> {
293 // Format each operation as one line
294 let mut json = Vec::new();
295 json.push(b'[');
296 for (operation, applied) in &self.stored_operations {
297 if !applied.load(SeqCst) {
298 continue;
299 }
300 if json.len() > 1 {
301 json.push(b',');
302 }
303 json.extend_from_slice(b"\n ");
304 serde_json::to_writer(&mut json, operation).unwrap();
305 }
306 json.extend_from_slice(b"\n]\n");
307 json
308 }
309
310 fn next_server_operation(
311 &mut self,
312 clients: &[(Rc<TestClient>, TestAppContext)],
313 ) -> Option<(ServerOperation, Arc<AtomicBool>)> {
314 if self.replay {
315 while let Some(stored_operation) = self.stored_operations.get(self.operation_ix) {
316 self.operation_ix += 1;
317 if let (StoredOperation::Server(operation), applied) = stored_operation {
318 return Some((operation.clone(), applied.clone()));
319 }
320 }
321 None
322 } else {
323 let operation = self.generate_server_operation(clients)?;
324 let applied = Arc::new(AtomicBool::new(false));
325 self.stored_operations
326 .push((StoredOperation::Server(operation.clone()), applied.clone()));
327 Some((operation, applied))
328 }
329 }
330
331 fn next_client_operation(
332 &mut self,
333 client: &TestClient,
334 current_batch_id: usize,
335 cx: &TestAppContext,
336 ) -> Option<(T::Operation, Arc<AtomicBool>)> {
337 let current_user_id = client.current_user_id(cx);
338 let user_ix = self
339 .users
340 .iter()
341 .position(|user| user.user_id == current_user_id)
342 .unwrap();
343 let user_plan = &mut self.users[user_ix];
344
345 if self.replay {
346 while let Some(stored_operation) = self.stored_operations.get(user_plan.operation_ix) {
347 user_plan.operation_ix += 1;
348 if let (
349 StoredOperation::Client {
350 user_id, operation, ..
351 },
352 applied,
353 ) = stored_operation
354 {
355 if user_id == ¤t_user_id {
356 return Some((operation.clone(), applied.clone()));
357 }
358 }
359 }
360 None
361 } else {
362 if self.operation_ix == self.max_operations {
363 return None;
364 }
365 self.operation_ix += 1;
366 let operation = T::generate_operation(
367 client,
368 &mut self.rng,
369 self.users
370 .iter_mut()
371 .find(|user| user.user_id == current_user_id)
372 .unwrap(),
373 cx,
374 );
375 let applied = Arc::new(AtomicBool::new(false));
376 self.stored_operations.push((
377 StoredOperation::Client {
378 user_id: current_user_id,
379 batch_id: current_batch_id,
380 operation: operation.clone(),
381 },
382 applied.clone(),
383 ));
384 Some((operation, applied))
385 }
386 }
387
388 fn generate_server_operation(
389 &mut self,
390 clients: &[(Rc<TestClient>, TestAppContext)],
391 ) -> Option<ServerOperation> {
392 if self.operation_ix == self.max_operations {
393 return None;
394 }
395
396 Some(loop {
397 break match self.rng.gen_range(0..100) {
398 0..=29 if clients.len() < self.users.len() => {
399 let user = self
400 .users
401 .iter()
402 .filter(|u| !u.online)
403 .choose(&mut self.rng)
404 .unwrap();
405 self.operation_ix += 1;
406 ServerOperation::AddConnection {
407 user_id: user.user_id,
408 }
409 }
410 30..=34 if clients.len() > 1 && self.allow_client_disconnection => {
411 let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
412 let user_id = client.current_user_id(cx);
413 self.operation_ix += 1;
414 ServerOperation::RemoveConnection { user_id }
415 }
416 35..=39 if clients.len() > 1 && self.allow_client_reconnection => {
417 let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
418 let user_id = client.current_user_id(cx);
419 self.operation_ix += 1;
420 ServerOperation::BounceConnection { user_id }
421 }
422 40..=44 if self.allow_server_restarts && clients.len() > 1 => {
423 self.operation_ix += 1;
424 ServerOperation::RestartServer
425 }
426 _ if !clients.is_empty() => {
427 let count = self
428 .rng
429 .gen_range(1..10)
430 .min(self.max_operations - self.operation_ix);
431 let batch_id = util::post_inc(&mut self.next_batch_id);
432 let mut user_ids = (0..count)
433 .map(|_| {
434 let ix = self.rng.gen_range(0..clients.len());
435 let (client, cx) = &clients[ix];
436 client.current_user_id(cx)
437 })
438 .collect::<Vec<_>>();
439 user_ids.sort_unstable();
440 ServerOperation::MutateClients {
441 user_ids,
442 batch_id,
443 quiesce: self.rng.gen_bool(0.7),
444 }
445 }
446 _ => continue,
447 };
448 })
449 }
450
451 async fn apply_server_operation(
452 plan: Arc<Mutex<Self>>,
453 deterministic: Arc<Deterministic>,
454 server: &mut TestServer,
455 clients: &mut Vec<(Rc<TestClient>, TestAppContext)>,
456 client_tasks: &mut Vec<Task<()>>,
457 operation_channels: &mut Vec<futures::channel::mpsc::UnboundedSender<usize>>,
458 operation: ServerOperation,
459 cx: &mut TestAppContext,
460 ) -> bool {
461 match operation {
462 ServerOperation::AddConnection { user_id } => {
463 let username;
464 {
465 let mut plan = plan.lock();
466 let user = plan.user(user_id);
467 if user.online {
468 return false;
469 }
470 user.online = true;
471 username = user.username.clone();
472 };
473 log::info!("adding new connection for {}", username);
474 let next_entity_id = (user_id.0 * 10_000) as usize;
475 let mut client_cx = TestAppContext::new(
476 cx.foreground_platform(),
477 cx.platform(),
478 deterministic.build_foreground(user_id.0 as usize),
479 deterministic.build_background(),
480 cx.font_cache(),
481 cx.leak_detector(),
482 next_entity_id,
483 cx.function_name.clone(),
484 );
485
486 let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded();
487 let client = Rc::new(server.create_client(&mut client_cx, &username).await);
488 operation_channels.push(operation_tx);
489 clients.push((client.clone(), client_cx.clone()));
490 client_tasks.push(client_cx.foreground().spawn(Self::simulate_client(
491 plan.clone(),
492 client,
493 operation_rx,
494 client_cx,
495 )));
496
497 log::info!("added connection for {}", username);
498 }
499
500 ServerOperation::RemoveConnection {
501 user_id: removed_user_id,
502 } => {
503 log::info!("simulating full disconnection of user {}", removed_user_id);
504 let client_ix = clients
505 .iter()
506 .position(|(client, cx)| client.current_user_id(cx) == removed_user_id);
507 let Some(client_ix) = client_ix else {
508 return false;
509 };
510 let user_connection_ids = server
511 .connection_pool
512 .lock()
513 .user_connection_ids(removed_user_id)
514 .collect::<Vec<_>>();
515 assert_eq!(user_connection_ids.len(), 1);
516 let removed_peer_id = user_connection_ids[0].into();
517 let (client, mut client_cx) = clients.remove(client_ix);
518 let client_task = client_tasks.remove(client_ix);
519 operation_channels.remove(client_ix);
520 server.forbid_connections();
521 server.disconnect_client(removed_peer_id);
522 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
523 deterministic.start_waiting();
524 log::info!("waiting for user {} to exit...", removed_user_id);
525 client_task.await;
526 deterministic.finish_waiting();
527 server.allow_connections();
528
529 for project in client.remote_projects().iter() {
530 project.read_with(&client_cx, |project, _| {
531 assert!(
532 project.is_read_only(),
533 "project {:?} should be read only",
534 project.remote_id()
535 )
536 });
537 }
538
539 for (client, cx) in clients {
540 let contacts = server
541 .app_state
542 .db
543 .get_contacts(client.current_user_id(cx))
544 .await
545 .unwrap();
546 let pool = server.connection_pool.lock();
547 for contact in contacts {
548 if let db::Contact::Accepted { user_id, busy, .. } = contact {
549 if user_id == removed_user_id {
550 assert!(!pool.is_user_online(user_id));
551 assert!(!busy);
552 }
553 }
554 }
555 }
556
557 log::info!("{} removed", client.username);
558 plan.lock().user(removed_user_id).online = false;
559 client_cx.update(|cx| {
560 cx.clear_globals();
561 drop(client);
562 });
563 }
564
565 ServerOperation::BounceConnection { user_id } => {
566 log::info!("simulating temporary disconnection of user {}", user_id);
567 let user_connection_ids = server
568 .connection_pool
569 .lock()
570 .user_connection_ids(user_id)
571 .collect::<Vec<_>>();
572 if user_connection_ids.is_empty() {
573 return false;
574 }
575 assert_eq!(user_connection_ids.len(), 1);
576 let peer_id = user_connection_ids[0].into();
577 server.disconnect_client(peer_id);
578 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
579 }
580
581 ServerOperation::RestartServer => {
582 log::info!("simulating server restart");
583 server.reset().await;
584 deterministic.advance_clock(RECEIVE_TIMEOUT);
585 server.start().await.unwrap();
586 deterministic.advance_clock(CLEANUP_TIMEOUT);
587 let environment = &server.app_state.config.zed_environment;
588 let (stale_room_ids, _) = server
589 .app_state
590 .db
591 .stale_server_resource_ids(environment, server.id())
592 .await
593 .unwrap();
594 assert_eq!(stale_room_ids, vec![]);
595 }
596
597 ServerOperation::MutateClients {
598 user_ids,
599 batch_id,
600 quiesce,
601 } => {
602 let mut applied = false;
603 for user_id in user_ids {
604 let client_ix = clients
605 .iter()
606 .position(|(client, cx)| client.current_user_id(cx) == user_id);
607 let Some(client_ix) = client_ix else { continue };
608 applied = true;
609 if let Err(err) = operation_channels[client_ix].unbounded_send(batch_id) {
610 log::error!("error signaling user {user_id}: {err}");
611 }
612 }
613
614 if quiesce && applied {
615 deterministic.run_until_parked();
616 T::on_quiesce(server, clients).await;
617 }
618
619 return applied;
620 }
621 }
622 true
623 }
624
625 async fn simulate_client(
626 plan: Arc<Mutex<Self>>,
627 client: Rc<TestClient>,
628 mut operation_rx: futures::channel::mpsc::UnboundedReceiver<usize>,
629 mut cx: TestAppContext,
630 ) {
631 T::on_client_added(&client, &mut cx).await;
632
633 while let Some(batch_id) = operation_rx.next().await {
634 let Some((operation, applied)) =
635 plan.lock().next_client_operation(&client, batch_id, &cx)
636 else {
637 break;
638 };
639 applied.store(true, SeqCst);
640 match T::apply_operation(&client, operation, &mut cx).await {
641 Ok(()) => {}
642 Err(TestError::Inapplicable) => {
643 applied.store(false, SeqCst);
644 log::info!("skipped operation");
645 }
646 Err(TestError::Other(error)) => {
647 log::error!("{} error: {}", client.username, error);
648 }
649 }
650 cx.background().simulate_random_delay().await;
651 }
652 log::info!("{}: done", client.username);
653 }
654
655 fn user(&mut self, user_id: UserId) -> &mut UserTestPlan {
656 self.users
657 .iter_mut()
658 .find(|user| user.user_id == user_id)
659 .unwrap()
660 }
661}
662
663impl UserTestPlan {
664 pub fn next_root_dir_name(&mut self) -> String {
665 let user_id = self.user_id;
666 let root_id = util::post_inc(&mut self.next_root_id);
667 format!("dir-{user_id}-{root_id}")
668 }
669}
670
671impl From<anyhow::Error> for TestError {
672 fn from(value: anyhow::Error) -> Self {
673 Self::Other(value)
674 }
675}
676
677fn path_env_var(name: &str) -> Option<PathBuf> {
678 let value = env::var(name).ok()?;
679 let mut path = PathBuf::from(value);
680 if path.is_relative() {
681 let mut abs_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
682 abs_path.pop();
683 abs_path.pop();
684 abs_path.push(path);
685 path = abs_path
686 }
687 Some(path)
688}