Introduce random reconnections in the randomized test

Antonio Scandurra created

Change summary

crates/call/src/room.rs                                        |  2 
crates/collab/migrations.sqlite/20221109000000_test_schema.sql | 14 
crates/collab/src/integration_tests.rs                         | 82 ++-
crates/collab/src/rpc.rs                                       | 45 +
4 files changed, 97 insertions(+), 46 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -550,10 +550,12 @@ impl Room {
         {
             for participant in self.remote_participants.values() {
                 assert!(self.participant_user_ids.contains(&participant.user.id));
+                assert_ne!(participant.user.id, self.client.user_id().unwrap());
             }
 
             for participant in &self.pending_participants {
                 assert!(self.participant_user_ids.contains(&participant.id));
+                assert_ne!(participant.id, self.client.user_id().unwrap());
             }
 
             assert_eq!(

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -1,5 +1,5 @@
 CREATE TABLE "users" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "github_login" VARCHAR,
     "admin" BOOLEAN,
     "email_address" VARCHAR(255) DEFAULT NULL,
@@ -17,14 +17,14 @@ CREATE INDEX "index_users_on_email_address" ON "users" ("email_address");
 CREATE INDEX "index_users_on_github_user_id" ON "users" ("github_user_id");
 
 CREATE TABLE "access_tokens" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "user_id" INTEGER REFERENCES users (id),
     "hash" VARCHAR(128)
 );
 CREATE INDEX "index_access_tokens_user_id" ON "access_tokens" ("user_id");
 
 CREATE TABLE "contacts" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "user_id_a" INTEGER REFERENCES users (id) NOT NULL,
     "user_id_b" INTEGER REFERENCES users (id) NOT NULL,
     "a_to_b" BOOLEAN NOT NULL,
@@ -35,12 +35,12 @@ CREATE UNIQUE INDEX "index_contacts_user_ids" ON "contacts" ("user_id_a", "user_
 CREATE INDEX "index_contacts_user_id_b" ON "contacts" ("user_id_b");
 
 CREATE TABLE "rooms" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "live_kit_room" VARCHAR NOT NULL
 );
 
 CREATE TABLE "projects" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "room_id" INTEGER REFERENCES rooms (id) NOT NULL,
     "host_user_id" INTEGER REFERENCES users (id) NOT NULL,
     "host_connection_id" INTEGER NOT NULL,
@@ -100,7 +100,7 @@ CREATE TABLE "language_servers" (
 CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id");
 
 CREATE TABLE "project_collaborators" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "connection_id" INTEGER NOT NULL,
     "connection_epoch" TEXT NOT NULL,
@@ -113,7 +113,7 @@ CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" O
 CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch");
 
 CREATE TABLE "room_participants" (
-    "id" INTEGER PRIMARY KEY,
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "room_id" INTEGER NOT NULL REFERENCES rooms (id),
     "user_id" INTEGER NOT NULL REFERENCES users (id),
     "answering_connection_id" INTEGER,

crates/collab/src/integration_tests.rs 🔗

@@ -1,7 +1,7 @@
 use crate::{
     db::{self, NewUserParams, TestDb, UserId},
     executor::Executor,
-    rpc::Server,
+    rpc::{Server, RECONNECT_TIMEOUT},
     AppState,
 };
 use ::rpc::Peer;
@@ -416,7 +416,7 @@ async fn test_disconnecting_from_room(
 
     // User A automatically reconnects to the room upon disconnection.
     server.disconnect_client(client_a.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
     deterministic.run_until_parked();
     assert_eq!(
         room_participants(&room_a, cx_a),
@@ -436,7 +436,7 @@ async fn test_disconnecting_from_room(
     // When user A disconnects, both client A and B clear their room on the active call.
     server.forbid_connections();
     server.disconnect_client(client_a.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
     active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none()));
     active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none()));
     assert_eq!(
@@ -456,7 +456,7 @@ async fn test_disconnecting_from_room(
 
     // Allow user A to reconnect to the server.
     server.allow_connections();
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
 
     // Call user B again from client A.
     active_call_a
@@ -581,7 +581,7 @@ async fn test_calls_on_multiple_connections(
 
     // User B disconnects the client that is not on the call. Everything should be fine.
     client_b1.disconnect(&cx_b1.to_async()).unwrap();
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
     client_b1
         .authenticate_and_connect(false, &cx_b1.to_async())
         .await
@@ -642,13 +642,13 @@ async fn test_calls_on_multiple_connections(
     // User A disconnects, causing both connections to stop ringing.
     server.forbid_connections();
     server.disconnect_client(client_a.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
     assert!(incoming_call_b1.next().await.unwrap().is_none());
     assert!(incoming_call_b2.next().await.unwrap().is_none());
 
     // User A reconnects automatically, then calls user B again.
     server.allow_connections();
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
     active_call_a
         .update(cx_a, |call, cx| {
             call.invite(client_b1.user_id().unwrap(), None, cx)
@@ -663,7 +663,7 @@ async fn test_calls_on_multiple_connections(
     server.forbid_connections();
     server.disconnect_client(client_b1.peer_id().unwrap());
     server.disconnect_client(client_b2.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
     active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none()));
 }
 
@@ -953,8 +953,9 @@ async fn test_host_disconnect(
     assert!(cx_b.is_window_edited(workspace_b.window_id()));
 
     // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
+    server.forbid_connections();
     server.disconnect_client(client_a.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
     project_a
         .condition(cx_a, |project, _| project.collaborators().is_empty())
         .await;
@@ -977,6 +978,11 @@ async fn test_host_disconnect(
         .unwrap();
     assert!(can_close);
 
+    // Allow client A to reconnect to the server.
+    server.allow_connections();
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
+
+    // Client B calls client A again after they reconnected.
     let active_call_b = cx_b.read(ActiveCall::global);
     active_call_b
         .update(cx_b, |call, cx| {
@@ -997,7 +1003,7 @@ async fn test_host_disconnect(
 
     // Drop client A's connection again. We should still unshare it successfully.
     server.disconnect_client(client_a.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
     project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
 }
 
@@ -2323,7 +2329,7 @@ async fn test_leaving_project(
     // Simulate connection loss for client C and ensure client A observes client C leaving the project.
     client_c.wait_for_current_user(cx_c).await;
     server.disconnect_client(client_c.peer_id().unwrap());
-    cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
+    cx_a.foreground().advance_clock(RECEIVE_TIMEOUT);
     deterministic.run_until_parked();
     project_a.read_with(cx_a, |project, _| {
         assert_eq!(project.collaborators().len(), 0);
@@ -4256,7 +4262,7 @@ async fn test_contacts(
 
     server.disconnect_client(client_c.peer_id().unwrap());
     server.forbid_connections();
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
     assert_eq!(
         contacts(&client_a, cx_a),
         [
@@ -4560,7 +4566,7 @@ async fn test_contacts(
 
     server.forbid_connections();
     server.disconnect_client(client_a.peer_id().unwrap());
-    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
     assert_eq!(contacts(&client_a, cx_a), []);
     assert_eq!(
         contacts(&client_b, cx_b),
@@ -5656,7 +5662,6 @@ async fn test_random_collaboration(
 
     let mut clients = Vec::new();
     let mut user_ids = Vec::new();
-    let mut peer_ids = Vec::new();
     let mut op_start_signals = Vec::new();
     let mut next_entity_id = 100000;
 
@@ -5683,7 +5688,6 @@ async fn test_random_collaboration(
                 let op_start_signal = futures::channel::mpsc::unbounded();
                 let guest = server.create_client(&mut guest_cx, &guest_username).await;
                 user_ids.push(guest.current_user_id(&guest_cx));
-                peer_ids.push(guest.peer_id().unwrap());
                 op_start_signals.push(op_start_signal.0);
                 clients.push(guest_cx.foreground().spawn(guest.simulate(
                     guest_username.clone(),
@@ -5695,16 +5699,26 @@ async fn test_random_collaboration(
                 log::info!("Added connection for {}", guest_username);
                 operations += 1;
             }
-            20..=29 if clients.len() > 1 => {
+            20..=24 if clients.len() > 1 => {
                 let guest_ix = rng.lock().gen_range(1..clients.len());
-                log::info!("Removing guest {}", user_ids[guest_ix]);
+                log::info!(
+                    "Simulating full disconnection of guest {}",
+                    user_ids[guest_ix]
+                );
                 let removed_guest_id = user_ids.remove(guest_ix);
-                let removed_peer_id = peer_ids.remove(guest_ix);
+                let user_connection_ids = server
+                    .connection_pool
+                    .lock()
+                    .await
+                    .user_connection_ids(removed_guest_id)
+                    .collect::<Vec<_>>();
+                assert_eq!(user_connection_ids.len(), 1);
+                let removed_peer_id = PeerId(user_connection_ids[0].0);
                 let guest = clients.remove(guest_ix);
                 op_start_signals.remove(guest_ix);
                 server.forbid_connections();
                 server.disconnect_client(removed_peer_id);
-                deterministic.advance_clock(RECEIVE_TIMEOUT);
+                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
                 deterministic.start_waiting();
                 log::info!("Waiting for guest {} to exit...", removed_guest_id);
                 let (guest, mut guest_cx) = guest.await;
@@ -5738,6 +5752,22 @@ async fn test_random_collaboration(
 
                 operations += 1;
             }
+            25..=29 if clients.len() > 1 => {
+                let guest_ix = rng.lock().gen_range(1..clients.len());
+                let user_id = user_ids[guest_ix];
+                log::info!("Simulating temporary disconnection of guest {}", user_id);
+                let user_connection_ids = server
+                    .connection_pool
+                    .lock()
+                    .await
+                    .user_connection_ids(user_id)
+                    .collect::<Vec<_>>();
+                assert_eq!(user_connection_ids.len(), 1);
+                let peer_id = PeerId(user_connection_ids[0].0);
+                server.disconnect_client(peer_id);
+                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+                operations += 1;
+            }
             _ if !op_start_signals.is_empty() => {
                 while operations < max_operations && rng.lock().gen_bool(0.7) {
                     op_start_signals
@@ -6163,6 +6193,7 @@ impl Deref for TestServer {
 impl Drop for TestServer {
     fn drop(&mut self) {
         self.peer.reset();
+        self.server.teardown();
         self.test_live_kit_server.teardown().unwrap();
     }
 }
@@ -6423,11 +6454,14 @@ impl TestClient {
                         .clone()
                 }
             };
-            if let Err(error) = active_call
-                .update(cx, |call, cx| call.share_project(project.clone(), cx))
-                .await
-            {
-                log::error!("{}: error sharing project, {:?}", username, error);
+
+            if active_call.read_with(cx, |call, _| call.room().is_some()) {
+                if let Err(error) = active_call
+                    .update(cx, |call, cx| call.share_project(project.clone(), cx))
+                    .await
+                {
+                    log::error!("{}: error sharing project, {:?}", username, error);
+                }
             }
 
             let buffers = client.buffers.entry(project.clone()).or_default();

crates/collab/src/rpc.rs 🔗

@@ -53,11 +53,11 @@ use std::{
     },
     time::Duration,
 };
-use tokio::sync::{Mutex, MutexGuard};
+use tokio::sync::{watch, Mutex, MutexGuard};
 use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
-pub const RECONNECTION_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT;
+pub const RECONNECT_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT;
 
 lazy_static! {
     static ref METRIC_CONNECTIONS: IntGauge =
@@ -143,6 +143,7 @@ pub struct Server {
     pub(crate) connection_pool: Arc<Mutex<ConnectionPool>>,
     app_state: Arc<AppState>,
     handlers: HashMap<TypeId, MessageHandler>,
+    teardown: watch::Sender<()>,
 }
 
 pub(crate) struct ConnectionPoolGuard<'a> {
@@ -173,6 +174,7 @@ impl Server {
             app_state,
             connection_pool: Default::default(),
             handlers: Default::default(),
+            teardown: watch::channel(()).0,
         };
 
         server
@@ -235,6 +237,10 @@ impl Server {
         Arc::new(server)
     }
 
+    pub fn teardown(&self) {
+        let _ = self.teardown.send(());
+    }
+
     fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
     where
         F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
@@ -333,6 +339,7 @@ impl Server {
         let user_id = user.id;
         let login = user.github_login;
         let span = info_span!("handle connection", %user_id, %login, %address);
+        let teardown = self.teardown.subscribe();
         async move {
             let (connection_id, handle_io, mut incoming_rx) = this
                 .peer
@@ -438,7 +445,7 @@ impl Server {
 
             drop(foreground_message_handlers);
             tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
-            if let Err(error) = sign_out(session, executor).await {
+            if let Err(error) = sign_out(session, teardown, executor).await {
                 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
             }
 
@@ -640,7 +647,11 @@ pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result
 }
 
 #[instrument(err, skip(executor))]
-async fn sign_out(session: Session, executor: Executor) -> Result<()> {
+async fn sign_out(
+    session: Session,
+    mut teardown: watch::Receiver<()>,
+    executor: Executor,
+) -> Result<()> {
     session.peer.disconnect(session.connection_id);
     session
         .connection_pool()
@@ -658,20 +669,24 @@ async fn sign_out(session: Session, executor: Executor) -> Result<()> {
         }
     }
 
-    executor.sleep(RECONNECTION_TIMEOUT).await;
-    leave_room_for_session(&session).await.trace_err();
+    futures::select_biased! {
+        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
+            leave_room_for_session(&session).await.trace_err();
 
-    if !session
-        .connection_pool()
-        .await
-        .is_user_online(session.user_id)
-    {
-        let db = session.db().await;
-        if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
-            room_updated(&room, &session);
+            if !session
+                .connection_pool()
+                .await
+                .is_user_online(session.user_id)
+            {
+                let db = session.db().await;
+                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
+                    room_updated(&room, &session);
+                }
+            }
+            update_user_contacts(session.user_id, &session).await?;
         }
+        _ = teardown.changed().fuse() => {}
     }
-    update_user_contacts(session.user_id, &session).await?;
 
     Ok(())
 }