Simulate random guest disconnection and reconnection

Antonio Scandurra created

Change summary

crates/server/src/rpc.rs | 216 ++++++++++++++++++++++++++---------------
1 file changed, 135 insertions(+), 81 deletions(-)

Detailed changes

crates/server/src/rpc.rs 🔗

@@ -1120,7 +1120,6 @@ mod tests {
         },
         time::Duration,
     };
-    use util::TryFutureExt;
     use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
 
     #[cfg(test)]
@@ -4981,6 +4980,8 @@ mod tests {
         let max_peers = env::var("MAX_PEERS")
             .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
             .unwrap_or(5);
+        assert!(max_peers <= 5);
+
         let max_operations = env::var("OPERATIONS")
             .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
             .unwrap_or(10);
@@ -4994,7 +4995,7 @@ mod tests {
         fs.insert_tree(
             "/_collab",
             json!({
-                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
+                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
             }),
         )
         .await;
@@ -5170,7 +5171,6 @@ mod tests {
         });
         host_language_registry.add(Arc::new(language));
 
-        let host_disconnected = Rc::new(AtomicBool::new(false));
         let op_start_signal = futures::channel::mpsc::unbounded();
         user_ids.push(host.current_user_id(&host_cx));
         op_start_signals.push(op_start_signal.0);
@@ -5187,22 +5187,33 @@ mod tests {
         } else {
             max_operations
         };
+        let mut available_guests = vec![
+            "guest-1".to_string(),
+            "guest-2".to_string(),
+            "guest-3".to_string(),
+            "guest-4".to_string(),
+        ];
         let mut operations = 0;
         while operations < max_operations {
             if operations == disconnect_host_at {
-                host_disconnected.store(true, SeqCst);
                 server.disconnect_client(user_ids[0]);
                 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
                 drop(op_start_signals);
                 let mut clients = futures::future::join_all(clients).await;
                 cx.foreground().run_until_parked();
 
-                let (host, mut host_cx) = clients.remove(0);
+                let (host, mut host_cx, host_err) = clients.remove(0);
+                if let Some(host_err) = host_err {
+                    log::error!("host error - {}", host_err);
+                }
                 host.project
                     .as_ref()
                     .unwrap()
                     .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
-                for (guest, mut guest_cx) in clients {
+                for (guest, mut guest_cx, guest_err) in clients {
+                    if let Some(guest_err) = guest_err {
+                        log::error!("{} error - {}", guest.username, guest_err);
+                    }
                     let contacts = server
                         .store
                         .read()
@@ -5225,9 +5236,10 @@ mod tests {
 
             let distribution = rng.lock().gen_range(0..100);
             match distribution {
-                0..=19 if clients.len() < max_peers => {
-                    let guest_id = clients.len();
-                    log::info!("Adding guest {}", guest_id);
+                0..=19 if !available_guests.is_empty() => {
+                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
+                    let guest_username = available_guests.remove(guest_ix);
+                    log::info!("Adding new connection for {}", guest_username);
                     next_entity_id += 100000;
                     let mut guest_cx = TestAppContext::new(
                         cx.foreground_platform(),
@@ -5238,9 +5250,7 @@ mod tests {
                         cx.leak_detector(),
                         next_entity_id,
                     );
-                    let guest = server
-                        .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
-                        .await;
+                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
                     let guest_project = Project::remote(
                         host_project_id,
                         guest.client.clone(),
@@ -5255,15 +5265,54 @@ mod tests {
                     user_ids.push(guest.current_user_id(&guest_cx));
                     op_start_signals.push(op_start_signal.0);
                     clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
-                        guest_id,
+                        guest_username.clone(),
                         guest_project,
                         op_start_signal.1,
                         rng.clone(),
-                        host_disconnected.clone(),
                         guest_cx,
                     )));
 
-                    log::info!("Guest {} added", guest_id);
+                    log::info!("Added connection for {}", guest_username);
+                    operations += 1;
+                }
+                20..=30 if clients.len() > 1 => {
+                    log::info!("Removing guest");
+                    let guest_ix = rng.lock().gen_range(1..clients.len());
+                    let removed_guest_id = user_ids.remove(guest_ix);
+                    let guest = clients.remove(guest_ix);
+                    op_start_signals.remove(guest_ix);
+                    server.disconnect_client(removed_guest_id);
+                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
+                    let (guest, mut guest_cx, guest_err) = guest.await;
+                    if let Some(guest_err) = guest_err {
+                        log::error!("{} error - {}", guest.username, guest_err);
+                    }
+                    guest
+                        .project
+                        .as_ref()
+                        .unwrap()
+                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
+                    for user_id in &user_ids {
+                        for contact in server.store.read().contacts_for_user(*user_id) {
+                            assert_ne!(
+                                contact.user_id, removed_guest_id.0 as u64,
+                                "removed guest is still a contact of another peer"
+                            );
+                            for project in contact.projects {
+                                for project_guest_id in project.guests {
+                                    assert_ne!(
+                                        project_guest_id, removed_guest_id.0 as u64,
+                                        "removed guest appears as still participating on a project"
+                                    );
+                                }
+                            }
+                        }
+                    }
+
+                    log::info!("{} removed", guest.username);
+                    available_guests.push(guest.username.clone());
+                    guest_cx.update(|_| drop(guest));
+
                     operations += 1;
                 }
                 _ => {
@@ -5287,7 +5336,10 @@ mod tests {
         let mut clients = futures::future::join_all(clients).await;
         cx.foreground().run_until_parked();
 
-        let (host_client, mut host_cx) = clients.remove(0);
+        let (host_client, mut host_cx, host_err) = clients.remove(0);
+        if let Some(host_err) = host_err {
+            panic!("host error - {}", host_err);
+        }
         let host_project = host_client.project.as_ref().unwrap();
         let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
             project
@@ -5305,8 +5357,10 @@ mod tests {
             .unwrap()
             .read_with(&host_cx, |project, cx| project.check_invariants(cx));
 
-        for (guest_client, mut guest_cx) in clients.into_iter() {
-            let guest_id = guest_client.client.id();
+        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
+            if let Some(guest_err) = guest_err {
+                panic!("{} error - {}", guest_client.username, guest_err);
+            }
             let worktree_snapshots =
                 guest_client
                     .project
@@ -5325,23 +5379,23 @@ mod tests {
             assert_eq!(
                 worktree_snapshots.keys().collect::<Vec<_>>(),
                 host_worktree_snapshots.keys().collect::<Vec<_>>(),
-                "guest {} has different worktrees than the host",
-                guest_id
+                "{} has different worktrees than the host",
+                guest_client.username
             );
             for (id, host_snapshot) in &host_worktree_snapshots {
                 let guest_snapshot = &worktree_snapshots[id];
                 assert_eq!(
                     guest_snapshot.root_name(),
                     host_snapshot.root_name(),
-                    "guest {} has different root name than the host for worktree {}",
-                    guest_id,
+                    "{} has different root name than the host for worktree {}",
+                    guest_client.username,
                     id
                 );
                 assert_eq!(
                     guest_snapshot.entries(false).collect::<Vec<_>>(),
                     host_snapshot.entries(false).collect::<Vec<_>>(),
-                    "guest {} has different snapshot than the host for worktree {}",
-                    guest_id,
+                    "{} has different snapshot than the host for worktree {}",
+                    guest_client.username,
                     id
                 );
             }
@@ -5357,7 +5411,7 @@ mod tests {
                 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
                     project.buffer_for_id(buffer_id, cx).expect(&format!(
                         "host does not have buffer for guest:{}, peer:{}, id:{}",
-                        guest_id, guest_client.peer_id, buffer_id
+                        guest_client.username, guest_client.peer_id, buffer_id
                     ))
                 });
                 let path = host_buffer
@@ -5366,16 +5420,16 @@ mod tests {
                 assert_eq!(
                     guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
                     0,
-                    "guest {}, buffer {}, path {:?} has deferred operations",
-                    guest_id,
+                    "{}, buffer {}, path {:?} has deferred operations",
+                    guest_client.username,
                     buffer_id,
                     path,
                 );
                 assert_eq!(
                     guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
                     host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
-                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
-                    guest_id,
+                    "{}, buffer {}, path {:?}, differs from the host's buffer",
+                    guest_client.username,
                     buffer_id,
                     path
                 );
@@ -5495,6 +5549,7 @@ mod tests {
             let client = TestClient {
                 client,
                 peer_id,
+                username: name.to_string(),
                 user_store,
                 language_registry: Arc::new(LanguageRegistry::test()),
                 project: Default::default(),
@@ -5571,6 +5626,7 @@ mod tests {
 
     struct TestClient {
         client: Arc<Client>,
+        username: String,
         pub peer_id: PeerId,
         pub user_store: ModelHandle<UserStore>,
         language_registry: Arc<LanguageRegistry>,
@@ -5682,7 +5738,7 @@ mod tests {
             op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
             rng: Arc<Mutex<StdRng>>,
             mut cx: TestAppContext,
-        ) -> (Self, TestAppContext) {
+        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
             async fn simulate_host_internal(
                 client: &mut TestClient,
                 project: ModelHandle<Project>,
@@ -5767,7 +5823,12 @@ mod tests {
                                         buffer.file().unwrap().full_path(cx),
                                         buffer.remote_id()
                                     );
-                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
+
+                                    if rng.lock().gen_bool(0.7) {
+                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
+                                    } else {
+                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
+                                    }
                                 });
                             }
                         }
@@ -5801,7 +5862,7 @@ mod tests {
                 Ok(())
             }
 
-            simulate_host_internal(
+            let result = simulate_host_internal(
                 &mut self,
                 project.clone(),
                 files,
@@ -5809,25 +5870,23 @@ mod tests {
                 rng,
                 &mut cx,
             )
-            .log_err()
             .await;
             log::info!("Host done");
             self.project = Some(project);
-            (self, cx)
+            (self, cx, result.err())
         }
 
         pub async fn simulate_guest(
             mut self,
-            guest_id: usize,
+            guest_username: String,
             project: ModelHandle<Project>,
             op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
             rng: Arc<Mutex<StdRng>>,
-            host_disconnected: Rc<AtomicBool>,
             mut cx: TestAppContext,
-        ) -> (Self, TestAppContext) {
+        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
             async fn simulate_guest_internal(
                 client: &mut TestClient,
-                guest_id: usize,
+                guest_username: &str,
                 project: ModelHandle<Project>,
                 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
                 rng: Arc<Mutex<StdRng>>,
@@ -5865,8 +5924,8 @@ mod tests {
                                 )
                             });
                         log::info!(
-                            "Guest {}: opening path {:?} in worktree {} ({})",
-                            guest_id,
+                            "{}: opening path {:?} in worktree {} ({})",
+                            guest_username,
                             project_path.1,
                             project_path.0,
                             worktree_root_name,
@@ -5877,8 +5936,8 @@ mod tests {
                             })
                             .await?;
                         log::info!(
-                            "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
-                            guest_id,
+                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
+                            guest_username,
                             project_path.1,
                             project_path.0,
                             worktree_root_name,
@@ -5900,8 +5959,8 @@ mod tests {
                         0..=9 => {
                             cx.update(|cx| {
                                 log::info!(
-                                    "Guest {}: dropping buffer {:?}",
-                                    guest_id,
+                                    "{}: dropping buffer {:?}",
+                                    guest_username,
                                     buffer.read(cx).file().unwrap().full_path(cx)
                                 );
                                 client.buffers.remove(&buffer);
@@ -5911,8 +5970,8 @@ mod tests {
                         10..=19 => {
                             let completions = project.update(cx, |project, cx| {
                                 log::info!(
-                                    "Guest {}: requesting completions for buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: requesting completions for buffer {} ({:?})",
+                                    guest_username,
                                     buffer.read(cx).remote_id(),
                                     buffer.read(cx).file().unwrap().full_path(cx)
                                 );
@@ -5925,7 +5984,7 @@ mod tests {
                                     .map_err(|err| anyhow!("completions request failed: {:?}", err))
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching completions request", guest_id);
+                                log::info!("{}: detaching completions request", guest_username);
                                 cx.update(|cx| completions.detach_and_log_err(cx));
                             } else {
                                 completions.await?;
@@ -5934,8 +5993,8 @@ mod tests {
                         20..=29 => {
                             let code_actions = project.update(cx, |project, cx| {
                                 log::info!(
-                                    "Guest {}: requesting code actions for buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: requesting code actions for buffer {} ({:?})",
+                                    guest_username,
                                     buffer.read(cx).remote_id(),
                                     buffer.read(cx).file().unwrap().full_path(cx)
                                 );
@@ -5948,7 +6007,7 @@ mod tests {
                                 })
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching code actions request", guest_id);
+                                log::info!("{}: detaching code actions request", guest_username);
                                 cx.update(|cx| code_actions.detach_and_log_err(cx));
                             } else {
                                 code_actions.await?;
@@ -5957,8 +6016,8 @@ mod tests {
                         30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
                             let (requested_version, save) = buffer.update(cx, |buffer, cx| {
                                 log::info!(
-                                    "Guest {}: saving buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: saving buffer {} ({:?})",
+                                    guest_username,
                                     buffer.remote_id(),
                                     buffer.file().unwrap().full_path(cx)
                                 );
@@ -5972,7 +6031,7 @@ mod tests {
                                 Ok::<_, anyhow::Error>(())
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching save request", guest_id);
+                                log::info!("{}: detaching save request", guest_username);
                                 cx.update(|cx| save.detach_and_log_err(cx));
                             } else {
                                 save.await?;
@@ -5981,8 +6040,8 @@ mod tests {
                         40..=44 => {
                             let prepare_rename = project.update(cx, |project, cx| {
                                 log::info!(
-                                    "Guest {}: preparing rename for buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: preparing rename for buffer {} ({:?})",
+                                    guest_username,
                                     buffer.read(cx).remote_id(),
                                     buffer.read(cx).file().unwrap().full_path(cx)
                                 );
@@ -5995,7 +6054,7 @@ mod tests {
                                 })
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching prepare rename request", guest_id);
+                                log::info!("{}: detaching prepare rename request", guest_username);
                                 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
                             } else {
                                 prepare_rename.await?;
@@ -6004,8 +6063,8 @@ mod tests {
                         45..=49 => {
                             let definitions = project.update(cx, |project, cx| {
                                 log::info!(
-                                    "Guest {}: requesting definitions for buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: requesting definitions for buffer {} ({:?})",
+                                    guest_username,
                                     buffer.read(cx).remote_id(),
                                     buffer.read(cx).file().unwrap().full_path(cx)
                                 );
@@ -6018,7 +6077,7 @@ mod tests {
                                     .map_err(|err| anyhow!("definitions request failed: {:?}", err))
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching definitions request", guest_id);
+                                log::info!("{}: detaching definitions request", guest_username);
                                 cx.update(|cx| definitions.detach_and_log_err(cx));
                             } else {
                                 client
@@ -6029,8 +6088,8 @@ mod tests {
                         50..=54 => {
                             let highlights = project.update(cx, |project, cx| {
                                 log::info!(
-                                    "Guest {}: requesting highlights for buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: requesting highlights for buffer {} ({:?})",
+                                    guest_username,
                                     buffer.read(cx).remote_id(),
                                     buffer.read(cx).file().unwrap().full_path(cx)
                                 );
@@ -6043,7 +6102,7 @@ mod tests {
                                     .map_err(|err| anyhow!("highlights request failed: {:?}", err))
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching highlights request", guest_id);
+                                log::info!("{}: detaching highlights request", guest_username);
                                 cx.update(|cx| highlights.detach_and_log_err(cx));
                             } else {
                                 highlights.await?;
@@ -6052,7 +6111,7 @@ mod tests {
                         55..=59 => {
                             let search = project.update(cx, |project, cx| {
                                 let query = rng.lock().gen_range('a'..='z');
-                                log::info!("Guest {}: project-wide search {:?}", guest_id, query);
+                                log::info!("{}: project-wide search {:?}", guest_username, query);
                                 project.search(SearchQuery::text(query, false, false), cx)
                             });
                             let search = cx.background().spawn(async move {
@@ -6061,7 +6120,7 @@ mod tests {
                                     .map_err(|err| anyhow!("search request failed: {:?}", err))
                             });
                             if rng.lock().gen_bool(0.3) {
-                                log::info!("Guest {}: detaching search request", guest_id);
+                                log::info!("{}: detaching search request", guest_username);
                                 cx.update(|cx| search.detach_and_log_err(cx));
                             } else {
                                 client.buffers.extend(search.await?.into_keys());
@@ -6070,12 +6129,16 @@ mod tests {
                         _ => {
                             buffer.update(cx, |buffer, cx| {
                                 log::info!(
-                                    "Guest {}: updating buffer {} ({:?})",
-                                    guest_id,
+                                    "{}: updating buffer {} ({:?})",
+                                    guest_username,
                                     buffer.remote_id(),
                                     buffer.file().unwrap().full_path(cx)
                                 );
-                                buffer.randomly_edit(&mut *rng.lock(), 5, cx)
+                                if rng.lock().gen_bool(0.7) {
+                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
+                                } else {
+                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
+                                }
                             });
                         }
                     }
@@ -6084,28 +6147,19 @@ mod tests {
                 Ok(())
             }
 
-            match simulate_guest_internal(
+            let result = simulate_guest_internal(
                 &mut self,
-                guest_id,
+                &guest_username,
                 project.clone(),
                 op_start_signal,
                 rng,
                 &mut cx,
             )
-            .await
-            {
-                Ok(()) => log::info!("guest {} done", guest_id),
-                Err(err) => {
-                    if host_disconnected.load(SeqCst) {
-                        log::error!("guest {} simulation error - {:?}", guest_id, err);
-                    } else {
-                        panic!("guest {} simulation error - {:?}", guest_id, err);
-                    }
-                }
-            }
+            .await;
+            log::info!("{}: done", guest_username);
 
             self.project = Some(project);
-            (self, cx)
+            (self, cx, result.err())
         }
     }