1use crate::{
2 db::{self, NewUserParams, UserId},
3 rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
4 tests::{TestClient, TestServer},
5};
6use async_trait::async_trait;
7use futures::StreamExt;
8use gpui::{BackgroundExecutor, Task, TestAppContext};
9use parking_lot::Mutex;
10use rand::prelude::*;
11use rpc::RECEIVE_TIMEOUT;
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use settings::SettingsStore;
14use std::sync::OnceLock;
15use std::{
16 env,
17 path::PathBuf,
18 rc::Rc,
19 sync::{
20 atomic::{AtomicBool, Ordering::SeqCst},
21 Arc,
22 },
23};
24
25fn plan_load_path() -> &'static Option<PathBuf> {
26 static PLAN_LOAD_PATH: OnceLock<Option<PathBuf>> = OnceLock::new();
27 PLAN_LOAD_PATH.get_or_init(|| path_env_var("LOAD_PLAN"))
28}
29
30fn plan_save_path() -> &'static Option<PathBuf> {
31 static PLAN_SAVE_PATH: OnceLock<Option<PathBuf>> = OnceLock::new();
32 PLAN_SAVE_PATH.get_or_init(|| path_env_var("SAVE_PLAN"))
33}
34
35fn max_peers() -> usize {
36 static MAX_PEERS: OnceLock<usize> = OnceLock::new();
37 *MAX_PEERS.get_or_init(|| {
38 env::var("MAX_PEERS")
39 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
40 .unwrap_or(3)
41 })
42}
43
44fn max_operations() -> usize {
45 static MAX_OPERATIONS: OnceLock<usize> = OnceLock::new();
46 *MAX_OPERATIONS.get_or_init(|| {
47 env::var("OPERATIONS")
48 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
49 .unwrap_or(10)
50 })
51}
52
53static LOADED_PLAN_JSON: Mutex<Option<Vec<u8>>> = Mutex::new(None);
54static LAST_PLAN: Mutex<Option<Box<dyn Send + FnOnce() -> Vec<u8>>>> = Mutex::new(None);
55
56struct TestPlan<T: RandomizedTest> {
57 rng: StdRng,
58 replay: bool,
59 stored_operations: Vec<(StoredOperation<T::Operation>, Arc<AtomicBool>)>,
60 max_operations: usize,
61 operation_ix: usize,
62 users: Vec<UserTestPlan>,
63 next_batch_id: usize,
64 allow_server_restarts: bool,
65 allow_client_reconnection: bool,
66 allow_client_disconnection: bool,
67}
68
69pub struct UserTestPlan {
70 pub user_id: UserId,
71 pub username: String,
72 pub allow_client_disconnection: bool,
73 next_root_id: usize,
74 operation_ix: usize,
75 online: bool,
76}
77
78#[derive(Clone, Debug, Serialize, Deserialize)]
79#[serde(untagged)]
80enum StoredOperation<T> {
81 Server(ServerOperation),
82 Client {
83 user_id: UserId,
84 batch_id: usize,
85 operation: T,
86 },
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90enum ServerOperation {
91 AddConnection {
92 user_id: UserId,
93 },
94 RemoveConnection {
95 user_id: UserId,
96 },
97 BounceConnection {
98 user_id: UserId,
99 },
100 RestartServer,
101 MutateClients {
102 batch_id: usize,
103 #[serde(skip_serializing)]
104 #[serde(skip_deserializing)]
105 user_ids: Vec<UserId>,
106 quiesce: bool,
107 },
108}
109
110pub enum TestError {
111 Inapplicable,
112 Other(anyhow::Error),
113}
114
115#[async_trait(?Send)]
116pub trait RandomizedTest: 'static + Sized {
117 type Operation: Send + Clone + Serialize + DeserializeOwned;
118
119 fn generate_operation(
120 client: &TestClient,
121 rng: &mut StdRng,
122 plan: &mut UserTestPlan,
123 cx: &TestAppContext,
124 ) -> Self::Operation;
125
126 async fn apply_operation(
127 client: &TestClient,
128 operation: Self::Operation,
129 cx: &mut TestAppContext,
130 ) -> Result<(), TestError>;
131
132 async fn initialize(server: &mut TestServer, users: &[UserTestPlan]);
133
134 async fn on_client_added(_client: &Rc<TestClient>, _cx: &mut TestAppContext) {}
135
136 async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc<TestClient>, TestAppContext)]);
137}
138
139pub async fn run_randomized_test<T: RandomizedTest>(
140 cx: &mut TestAppContext,
141 executor: BackgroundExecutor,
142 rng: StdRng,
143) {
144 let mut server = TestServer::start(executor.clone()).await;
145 let plan = TestPlan::<T>::new(&mut server, rng).await;
146
147 LAST_PLAN.lock().replace({
148 let plan = plan.clone();
149 Box::new(move || plan.lock().serialize())
150 });
151
152 let mut clients = Vec::new();
153 let mut client_tasks = Vec::new();
154 let mut operation_channels = Vec::new();
155 loop {
156 let Some((next_operation, applied)) = plan.lock().next_server_operation(&clients) else {
157 break;
158 };
159 applied.store(true, SeqCst);
160 let did_apply = TestPlan::apply_server_operation(
161 plan.clone(),
162 executor.clone(),
163 &mut server,
164 &mut clients,
165 &mut client_tasks,
166 &mut operation_channels,
167 next_operation,
168 cx,
169 )
170 .await;
171 if !did_apply {
172 applied.store(false, SeqCst);
173 }
174 }
175
176 drop(operation_channels);
177 executor.start_waiting();
178 futures::future::join_all(client_tasks).await;
179 executor.finish_waiting();
180
181 executor.run_until_parked();
182 T::on_quiesce(&mut server, &mut clients).await;
183
184 for (client, cx) in clients {
185 cx.update(|cx| {
186 let store = cx.remove_global::<SettingsStore>();
187 cx.clear_globals();
188 cx.set_global(store);
189 drop(client);
190 });
191 }
192 executor.run_until_parked();
193
194 if let Some(path) = plan_save_path() {
195 eprintln!("saved test plan to path {:?}", path);
196 std::fs::write(path, plan.lock().serialize()).unwrap();
197 }
198}
199
200pub fn save_randomized_test_plan() {
201 if let Some(serialize_plan) = LAST_PLAN.lock().take() {
202 if let Some(path) = plan_save_path() {
203 eprintln!("saved test plan to path {:?}", path);
204 std::fs::write(path, serialize_plan()).unwrap();
205 }
206 }
207}
208
209impl<T: RandomizedTest> TestPlan<T> {
210 pub async fn new(server: &mut TestServer, mut rng: StdRng) -> Arc<Mutex<Self>> {
211 let allow_server_restarts = rng.gen_bool(0.7);
212 let allow_client_reconnection = rng.gen_bool(0.7);
213 let allow_client_disconnection = rng.gen_bool(0.1);
214
215 let mut users = Vec::new();
216 for ix in 0..max_peers() {
217 let username = format!("user-{}", ix + 1);
218 let user_id = server
219 .app_state
220 .db
221 .create_user(
222 &format!("{username}@example.com"),
223 false,
224 NewUserParams {
225 github_login: username.clone(),
226 github_user_id: ix as i32,
227 },
228 )
229 .await
230 .unwrap()
231 .user_id;
232 users.push(UserTestPlan {
233 user_id,
234 username,
235 online: false,
236 next_root_id: 0,
237 operation_ix: 0,
238 allow_client_disconnection,
239 });
240 }
241
242 T::initialize(server, &users).await;
243
244 let plan = Arc::new(Mutex::new(Self {
245 replay: false,
246 allow_server_restarts,
247 allow_client_reconnection,
248 allow_client_disconnection,
249 stored_operations: Vec::new(),
250 operation_ix: 0,
251 next_batch_id: 0,
252 max_operations: max_operations(),
253 users,
254 rng,
255 }));
256
257 if let Some(path) = plan_load_path() {
258 let json = LOADED_PLAN_JSON
259 .lock()
260 .get_or_insert_with(|| {
261 eprintln!("loaded test plan from path {:?}", path);
262 std::fs::read(path).unwrap()
263 })
264 .clone();
265 plan.lock().deserialize(json);
266 }
267
268 plan
269 }
270
271 fn deserialize(&mut self, json: Vec<u8>) {
272 let stored_operations: Vec<StoredOperation<T::Operation>> =
273 serde_json::from_slice(&json).unwrap();
274 self.replay = true;
275 self.stored_operations = stored_operations
276 .iter()
277 .cloned()
278 .enumerate()
279 .map(|(i, mut operation)| {
280 let did_apply = Arc::new(AtomicBool::new(false));
281 if let StoredOperation::Server(ServerOperation::MutateClients {
282 batch_id: current_batch_id,
283 user_ids,
284 ..
285 }) = &mut operation
286 {
287 assert!(user_ids.is_empty());
288 user_ids.extend(stored_operations[i + 1..].iter().filter_map(|operation| {
289 if let StoredOperation::Client {
290 user_id, batch_id, ..
291 } = operation
292 {
293 if batch_id == current_batch_id {
294 return Some(user_id);
295 }
296 }
297 None
298 }));
299 user_ids.sort_unstable();
300 }
301 (operation, did_apply)
302 })
303 .collect()
304 }
305
306 fn serialize(&mut self) -> Vec<u8> {
307 // Format each operation as one line
308 let mut json = Vec::new();
309 json.push(b'[');
310 for (operation, applied) in &self.stored_operations {
311 if !applied.load(SeqCst) {
312 continue;
313 }
314 if json.len() > 1 {
315 json.push(b',');
316 }
317 json.extend_from_slice(b"\n ");
318 serde_json::to_writer(&mut json, operation).unwrap();
319 }
320 json.extend_from_slice(b"\n]\n");
321 json
322 }
323
324 fn next_server_operation(
325 &mut self,
326 clients: &[(Rc<TestClient>, TestAppContext)],
327 ) -> Option<(ServerOperation, Arc<AtomicBool>)> {
328 if self.replay {
329 while let Some(stored_operation) = self.stored_operations.get(self.operation_ix) {
330 self.operation_ix += 1;
331 if let (StoredOperation::Server(operation), applied) = stored_operation {
332 return Some((operation.clone(), applied.clone()));
333 }
334 }
335 None
336 } else {
337 let operation = self.generate_server_operation(clients)?;
338 let applied = Arc::new(AtomicBool::new(false));
339 self.stored_operations
340 .push((StoredOperation::Server(operation.clone()), applied.clone()));
341 Some((operation, applied))
342 }
343 }
344
345 fn next_client_operation(
346 &mut self,
347 client: &TestClient,
348 current_batch_id: usize,
349 cx: &TestAppContext,
350 ) -> Option<(T::Operation, Arc<AtomicBool>)> {
351 let current_user_id = client.current_user_id(cx);
352 let user_ix = self
353 .users
354 .iter()
355 .position(|user| user.user_id == current_user_id)
356 .unwrap();
357 let user_plan = &mut self.users[user_ix];
358
359 if self.replay {
360 while let Some(stored_operation) = self.stored_operations.get(user_plan.operation_ix) {
361 user_plan.operation_ix += 1;
362 if let (
363 StoredOperation::Client {
364 user_id, operation, ..
365 },
366 applied,
367 ) = stored_operation
368 {
369 if user_id == ¤t_user_id {
370 return Some((operation.clone(), applied.clone()));
371 }
372 }
373 }
374 None
375 } else {
376 if self.operation_ix == self.max_operations {
377 return None;
378 }
379 self.operation_ix += 1;
380 let operation = T::generate_operation(
381 client,
382 &mut self.rng,
383 self.users
384 .iter_mut()
385 .find(|user| user.user_id == current_user_id)
386 .unwrap(),
387 cx,
388 );
389 let applied = Arc::new(AtomicBool::new(false));
390 self.stored_operations.push((
391 StoredOperation::Client {
392 user_id: current_user_id,
393 batch_id: current_batch_id,
394 operation: operation.clone(),
395 },
396 applied.clone(),
397 ));
398 Some((operation, applied))
399 }
400 }
401
402 fn generate_server_operation(
403 &mut self,
404 clients: &[(Rc<TestClient>, TestAppContext)],
405 ) -> Option<ServerOperation> {
406 if self.operation_ix == self.max_operations {
407 return None;
408 }
409
410 Some(loop {
411 break match self.rng.gen_range(0..100) {
412 0..=29 if clients.len() < self.users.len() => {
413 let user = self
414 .users
415 .iter()
416 .filter(|u| !u.online)
417 .choose(&mut self.rng)
418 .unwrap();
419 self.operation_ix += 1;
420 ServerOperation::AddConnection {
421 user_id: user.user_id,
422 }
423 }
424 30..=34 if clients.len() > 1 && self.allow_client_disconnection => {
425 let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
426 let user_id = client.current_user_id(cx);
427 self.operation_ix += 1;
428 ServerOperation::RemoveConnection { user_id }
429 }
430 35..=39 if clients.len() > 1 && self.allow_client_reconnection => {
431 let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
432 let user_id = client.current_user_id(cx);
433 self.operation_ix += 1;
434 ServerOperation::BounceConnection { user_id }
435 }
436 40..=44 if self.allow_server_restarts && clients.len() > 1 => {
437 self.operation_ix += 1;
438 ServerOperation::RestartServer
439 }
440 _ if !clients.is_empty() => {
441 let count = self
442 .rng
443 .gen_range(1..10)
444 .min(self.max_operations - self.operation_ix);
445 let batch_id = util::post_inc(&mut self.next_batch_id);
446 let mut user_ids = (0..count)
447 .map(|_| {
448 let ix = self.rng.gen_range(0..clients.len());
449 let (client, cx) = &clients[ix];
450 client.current_user_id(cx)
451 })
452 .collect::<Vec<_>>();
453 user_ids.sort_unstable();
454 ServerOperation::MutateClients {
455 user_ids,
456 batch_id,
457 quiesce: self.rng.gen_bool(0.7),
458 }
459 }
460 _ => continue,
461 };
462 })
463 }
464
465 #[allow(clippy::too_many_arguments)]
466 async fn apply_server_operation(
467 plan: Arc<Mutex<Self>>,
468 deterministic: BackgroundExecutor,
469 server: &mut TestServer,
470 clients: &mut Vec<(Rc<TestClient>, TestAppContext)>,
471 client_tasks: &mut Vec<Task<()>>,
472 operation_channels: &mut Vec<futures::channel::mpsc::UnboundedSender<usize>>,
473 operation: ServerOperation,
474 cx: &mut TestAppContext,
475 ) -> bool {
476 match operation {
477 ServerOperation::AddConnection { user_id } => {
478 let username;
479 {
480 let mut plan = plan.lock();
481 let user = plan.user(user_id);
482 if user.online {
483 return false;
484 }
485 user.online = true;
486 username = user.username.clone();
487 };
488 log::info!("adding new connection for {}", username);
489
490 let mut client_cx = cx.new_app();
491
492 let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded();
493 let client = Rc::new(server.create_client(&mut client_cx, &username).await);
494 operation_channels.push(operation_tx);
495 clients.push((client.clone(), client_cx.clone()));
496
497 let foreground_executor = client_cx.foreground_executor().clone();
498 let simulate_client =
499 Self::simulate_client(plan.clone(), client, operation_rx, client_cx);
500 client_tasks.push(foreground_executor.spawn(simulate_client));
501
502 log::info!("added connection for {}", username);
503 }
504
505 ServerOperation::RemoveConnection {
506 user_id: removed_user_id,
507 } => {
508 log::info!("simulating full disconnection of user {}", removed_user_id);
509 let client_ix = clients
510 .iter()
511 .position(|(client, cx)| client.current_user_id(cx) == removed_user_id);
512 let Some(client_ix) = client_ix else {
513 return false;
514 };
515 let user_connection_ids = server
516 .connection_pool
517 .lock()
518 .user_connection_ids(removed_user_id)
519 .collect::<Vec<_>>();
520 assert_eq!(user_connection_ids.len(), 1);
521 let removed_peer_id = user_connection_ids[0].into();
522 let (client, client_cx) = clients.remove(client_ix);
523 let client_task = client_tasks.remove(client_ix);
524 operation_channels.remove(client_ix);
525 server.forbid_connections();
526 server.disconnect_client(removed_peer_id);
527 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
528 deterministic.start_waiting();
529 log::info!("waiting for user {} to exit...", removed_user_id);
530 client_task.await;
531 deterministic.finish_waiting();
532 server.allow_connections();
533
534 for project in client.dev_server_projects().iter() {
535 project.read_with(&client_cx, |project, cx| {
536 assert!(
537 project.is_disconnected(cx),
538 "project {:?} should be read only",
539 project.remote_id()
540 )
541 });
542 }
543
544 for (client, cx) in clients {
545 let contacts = server
546 .app_state
547 .db
548 .get_contacts(client.current_user_id(cx))
549 .await
550 .unwrap();
551 let pool = server.connection_pool.lock();
552 for contact in contacts {
553 if let db::Contact::Accepted { user_id, busy, .. } = contact {
554 if user_id == removed_user_id {
555 assert!(!pool.is_user_online(user_id));
556 assert!(!busy);
557 }
558 }
559 }
560 }
561
562 log::info!("{} removed", client.username);
563 plan.lock().user(removed_user_id).online = false;
564 client_cx.update(|cx| {
565 cx.clear_globals();
566 drop(client);
567 });
568 }
569
570 ServerOperation::BounceConnection { user_id } => {
571 log::info!("simulating temporary disconnection of user {}", user_id);
572 let user_connection_ids = server
573 .connection_pool
574 .lock()
575 .user_connection_ids(user_id)
576 .collect::<Vec<_>>();
577 if user_connection_ids.is_empty() {
578 return false;
579 }
580 assert_eq!(user_connection_ids.len(), 1);
581 let peer_id = user_connection_ids[0].into();
582 server.disconnect_client(peer_id);
583 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
584 }
585
586 ServerOperation::RestartServer => {
587 log::info!("simulating server restart");
588 server.reset().await;
589 deterministic.advance_clock(RECEIVE_TIMEOUT);
590 server.start().await.unwrap();
591 deterministic.advance_clock(CLEANUP_TIMEOUT);
592 let environment = &server.app_state.config.zed_environment;
593 let (stale_room_ids, _) = server
594 .app_state
595 .db
596 .stale_server_resource_ids(environment, server.id())
597 .await
598 .unwrap();
599 assert_eq!(stale_room_ids, vec![]);
600 }
601
602 ServerOperation::MutateClients {
603 user_ids,
604 batch_id,
605 quiesce,
606 } => {
607 let mut applied = false;
608 for user_id in user_ids {
609 let client_ix = clients
610 .iter()
611 .position(|(client, cx)| client.current_user_id(cx) == user_id);
612 let Some(client_ix) = client_ix else { continue };
613 applied = true;
614 if let Err(err) = operation_channels[client_ix].unbounded_send(batch_id) {
615 log::error!("error signaling user {user_id}: {err}");
616 }
617 }
618
619 if quiesce && applied {
620 deterministic.run_until_parked();
621 T::on_quiesce(server, clients).await;
622 }
623
624 return applied;
625 }
626 }
627 true
628 }
629
630 async fn simulate_client(
631 plan: Arc<Mutex<Self>>,
632 client: Rc<TestClient>,
633 mut operation_rx: futures::channel::mpsc::UnboundedReceiver<usize>,
634 mut cx: TestAppContext,
635 ) {
636 T::on_client_added(&client, &mut cx).await;
637
638 while let Some(batch_id) = operation_rx.next().await {
639 let Some((operation, applied)) =
640 plan.lock().next_client_operation(&client, batch_id, &cx)
641 else {
642 break;
643 };
644 applied.store(true, SeqCst);
645 match T::apply_operation(&client, operation, &mut cx).await {
646 Ok(()) => {}
647 Err(TestError::Inapplicable) => {
648 applied.store(false, SeqCst);
649 log::info!("skipped operation");
650 }
651 Err(TestError::Other(error)) => {
652 log::error!("{} error: {}", client.username, error);
653 }
654 }
655 cx.executor().simulate_random_delay().await;
656 }
657 log::info!("{}: done", client.username);
658 }
659
660 fn user(&mut self, user_id: UserId) -> &mut UserTestPlan {
661 self.users
662 .iter_mut()
663 .find(|user| user.user_id == user_id)
664 .unwrap()
665 }
666}
667
668impl UserTestPlan {
669 pub fn next_root_dir_name(&mut self) -> String {
670 let user_id = self.user_id;
671 let root_id = util::post_inc(&mut self.next_root_id);
672 format!("dir-{user_id}-{root_id}")
673 }
674}
675
676impl From<anyhow::Error> for TestError {
677 fn from(value: anyhow::Error) -> Self {
678 Self::Other(value)
679 }
680}
681
682fn path_env_var(name: &str) -> Option<PathBuf> {
683 let value = env::var(name).ok()?;
684 let mut path = PathBuf::from(value);
685 if path.is_relative() {
686 let mut abs_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
687 abs_path.pop();
688 abs_path.pop();
689 abs_path.push(path);
690 path = abs_path
691 }
692 Some(path)
693}