diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 828885e9bdc2a6faf2d86235d0ed122d9aba54b9..824ec49054c8f1721a834cc0de46e2426636dc7f 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -550,10 +550,12 @@ impl Room { { for participant in self.remote_participants.values() { assert!(self.participant_user_ids.contains(&participant.user.id)); + assert_ne!(participant.user.id, self.client.user_id().unwrap()); } for participant in &self.pending_participants { assert!(self.participant_user_ids.contains(&participant.id)); + assert_ne!(participant.id, self.client.user_id().unwrap()); } assert_eq!( diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 4eba8d230207da09f4f83d251b41798e322c3771..9f03541f44a539a6064914f1264f62f5b7ec9149 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -1,5 +1,5 @@ CREATE TABLE "users" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "github_login" VARCHAR, "admin" BOOLEAN, "email_address" VARCHAR(255) DEFAULT NULL, @@ -17,14 +17,14 @@ CREATE INDEX "index_users_on_email_address" ON "users" ("email_address"); CREATE INDEX "index_users_on_github_user_id" ON "users" ("github_user_id"); CREATE TABLE "access_tokens" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "user_id" INTEGER REFERENCES users (id), "hash" VARCHAR(128) ); CREATE INDEX "index_access_tokens_user_id" ON "access_tokens" ("user_id"); CREATE TABLE "contacts" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "user_id_a" INTEGER REFERENCES users (id) NOT NULL, "user_id_b" INTEGER REFERENCES users (id) NOT NULL, "a_to_b" BOOLEAN NOT NULL, @@ -35,12 +35,12 @@ CREATE UNIQUE INDEX "index_contacts_user_ids" ON "contacts" ("user_id_a", "user_ CREATE INDEX "index_contacts_user_id_b" ON "contacts" ("user_id_b"); CREATE TABLE "rooms" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "live_kit_room" VARCHAR NOT NULL ); CREATE TABLE "projects" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "room_id" INTEGER REFERENCES rooms (id) NOT NULL, "host_user_id" INTEGER REFERENCES users (id) NOT NULL, "host_connection_id" INTEGER NOT NULL, @@ -100,7 +100,7 @@ CREATE TABLE "language_servers" ( CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id"); CREATE TABLE "project_collaborators" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, "connection_epoch" TEXT NOT NULL, @@ -113,7 +113,7 @@ CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" O CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch"); CREATE TABLE "room_participants" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index a2639d7c5823a979819bc6257c9eb41f5ffe3025..aca5f77fe98bde707e042f7756ebebcbf3070b73 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,7 +1,7 @@ use crate::{ db::{self, NewUserParams, TestDb, UserId}, executor::Executor, - rpc::Server, + rpc::{Server, RECONNECT_TIMEOUT}, AppState, }; use ::rpc::Peer; @@ -416,7 +416,7 @@ async fn test_disconnecting_from_room( // User A automatically reconnects to the room upon disconnection. server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -436,7 +436,7 @@ async fn test_disconnecting_from_room( // When user A disconnects, both client A and B clear their room on the active call. server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( @@ -456,7 +456,7 @@ async fn test_disconnecting_from_room( // Allow user A to reconnect to the server. server.allow_connections(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); // Call user B again from client A. active_call_a @@ -581,7 +581,7 @@ async fn test_calls_on_multiple_connections( // User B disconnects the client that is not on the call. Everything should be fine. client_b1.disconnect(&cx_b1.to_async()).unwrap(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); client_b1 .authenticate_and_connect(false, &cx_b1.to_async()) .await @@ -642,13 +642,13 @@ async fn test_calls_on_multiple_connections( // User A disconnects, causing both connections to stop ringing. server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); // User A reconnects automatically, then calls user B again. server.allow_connections(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); active_call_a .update(cx_a, |call, cx| { call.invite(client_b1.user_id().unwrap(), None, cx) @@ -663,7 +663,7 @@ async fn test_calls_on_multiple_connections( server.forbid_connections(); server.disconnect_client(client_b1.peer_id().unwrap()); server.disconnect_client(client_b2.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); } @@ -953,8 +953,9 @@ async fn test_host_disconnect( assert!(cx_b.is_window_edited(workspace_b.window_id())); // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared. + server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); project_a .condition(cx_a, |project, _| project.collaborators().is_empty()) .await; @@ -977,6 +978,11 @@ async fn test_host_disconnect( .unwrap(); assert!(can_close); + // Allow client A to reconnect to the server. + server.allow_connections(); + deterministic.advance_clock(RECEIVE_TIMEOUT); + + // Client B calls client A again after they reconnected. let active_call_b = cx_b.read(ActiveCall::global); active_call_b .update(cx_b, |call, cx| { @@ -997,7 +1003,7 @@ async fn test_host_disconnect( // Drop client A's connection again. We should still unshare it successfully. server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); } @@ -2323,7 +2329,7 @@ async fn test_leaving_project( // Simulate connection loss for client C and ensure client A observes client C leaving the project. client_c.wait_for_current_user(cx_c).await; server.disconnect_client(client_c.peer_id().unwrap()); - cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + cx_a.foreground().advance_clock(RECEIVE_TIMEOUT); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| { assert_eq!(project.collaborators().len(), 0); @@ -4256,7 +4262,7 @@ async fn test_contacts( server.disconnect_client(client_c.peer_id().unwrap()); server.forbid_connections(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); assert_eq!( contacts(&client_a, cx_a), [ @@ -4560,7 +4566,7 @@ async fn test_contacts( server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); assert_eq!(contacts(&client_a, cx_a), []); assert_eq!( contacts(&client_b, cx_b), @@ -5656,7 +5662,6 @@ async fn test_random_collaboration( let mut clients = Vec::new(); let mut user_ids = Vec::new(); - let mut peer_ids = Vec::new(); let mut op_start_signals = Vec::new(); let mut next_entity_id = 100000; @@ -5683,7 +5688,6 @@ async fn test_random_collaboration( let op_start_signal = futures::channel::mpsc::unbounded(); let guest = server.create_client(&mut guest_cx, &guest_username).await; user_ids.push(guest.current_user_id(&guest_cx)); - peer_ids.push(guest.peer_id().unwrap()); op_start_signals.push(op_start_signal.0); clients.push(guest_cx.foreground().spawn(guest.simulate( guest_username.clone(), @@ -5695,16 +5699,26 @@ async fn test_random_collaboration( log::info!("Added connection for {}", guest_username); operations += 1; } - 20..=29 if clients.len() > 1 => { + 20..=24 if clients.len() > 1 => { let guest_ix = rng.lock().gen_range(1..clients.len()); - log::info!("Removing guest {}", user_ids[guest_ix]); + log::info!( + "Simulating full disconnection of guest {}", + user_ids[guest_ix] + ); let removed_guest_id = user_ids.remove(guest_ix); - let removed_peer_id = peer_ids.remove(guest_ix); + let user_connection_ids = server + .connection_pool + .lock() + .await + .user_connection_ids(removed_guest_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let removed_peer_id = PeerId(user_connection_ids[0].0); let guest = clients.remove(guest_ix); op_start_signals.remove(guest_ix); server.forbid_connections(); server.disconnect_client(removed_peer_id); - deterministic.advance_clock(RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); deterministic.start_waiting(); log::info!("Waiting for guest {} to exit...", removed_guest_id); let (guest, mut guest_cx) = guest.await; @@ -5738,6 +5752,22 @@ async fn test_random_collaboration( operations += 1; } + 25..=29 if clients.len() > 1 => { + let guest_ix = rng.lock().gen_range(1..clients.len()); + let user_id = user_ids[guest_ix]; + log::info!("Simulating temporary disconnection of guest {}", user_id); + let user_connection_ids = server + .connection_pool + .lock() + .await + .user_connection_ids(user_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let peer_id = PeerId(user_connection_ids[0].0); + server.disconnect_client(peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + operations += 1; + } _ if !op_start_signals.is_empty() => { while operations < max_operations && rng.lock().gen_bool(0.7) { op_start_signals @@ -6163,6 +6193,7 @@ impl Deref for TestServer { impl Drop for TestServer { fn drop(&mut self) { self.peer.reset(); + self.server.teardown(); self.test_live_kit_server.teardown().unwrap(); } } @@ -6423,11 +6454,14 @@ impl TestClient { .clone() } }; - if let Err(error) = active_call - .update(cx, |call, cx| call.share_project(project.clone(), cx)) - .await - { - log::error!("{}: error sharing project, {:?}", username, error); + + if active_call.read_with(cx, |call, _| call.room().is_some()) { + if let Err(error) = active_call + .update(cx, |call, cx| call.share_project(project.clone(), cx)) + .await + { + log::error!("{}: error sharing project, {:?}", username, error); + } } let buffers = client.buffers.entry(project.clone()).or_default(); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 3f70043bfb30c8ee717eaba49a82e690e6624aa0..e1d318fd3ee4ed8564b14575b3ecd8b7424c2507 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -53,11 +53,11 @@ use std::{ }, time::Duration, }; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::{watch, Mutex, MutexGuard}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; -pub const RECONNECTION_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT; +pub const RECONNECT_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT; lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = @@ -143,6 +143,7 @@ pub struct Server { pub(crate) connection_pool: Arc>, app_state: Arc, handlers: HashMap, + teardown: watch::Sender<()>, } pub(crate) struct ConnectionPoolGuard<'a> { @@ -173,6 +174,7 @@ impl Server { app_state, connection_pool: Default::default(), handlers: Default::default(), + teardown: watch::channel(()).0, }; server @@ -235,6 +237,10 @@ impl Server { Arc::new(server) } + pub fn teardown(&self) { + let _ = self.teardown.send(()); + } + fn add_handler(&mut self, handler: F) -> &mut Self where F: 'static + Send + Sync + Fn(TypedEnvelope, Session) -> Fut, @@ -333,6 +339,7 @@ impl Server { let user_id = user.id; let login = user.github_login; let span = info_span!("handle connection", %user_id, %login, %address); + let teardown = self.teardown.subscribe(); async move { let (connection_id, handle_io, mut incoming_rx) = this .peer @@ -438,7 +445,7 @@ impl Server { drop(foreground_message_handlers); tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); - if let Err(error) = sign_out(session, executor).await { + if let Err(error) = sign_out(session, teardown, executor).await { tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); } @@ -640,7 +647,11 @@ pub async fn handle_metrics(Extension(server): Extension>) -> Result } #[instrument(err, skip(executor))] -async fn sign_out(session: Session, executor: Executor) -> Result<()> { +async fn sign_out( + session: Session, + mut teardown: watch::Receiver<()>, + executor: Executor, +) -> Result<()> { session.peer.disconnect(session.connection_id); session .connection_pool() @@ -658,20 +669,24 @@ async fn sign_out(session: Session, executor: Executor) -> Result<()> { } } - executor.sleep(RECONNECTION_TIMEOUT).await; - leave_room_for_session(&session).await.trace_err(); + futures::select_biased! { + _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => { + leave_room_for_session(&session).await.trace_err(); - if !session - .connection_pool() - .await - .is_user_online(session.user_id) - { - let db = session.db().await; - if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { - room_updated(&room, &session); + if !session + .connection_pool() + .await + .is_user_online(session.user_id) + { + let db = session.db().await; + if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { + room_updated(&room, &session); + } + } + update_user_contacts(session.user_id, &session).await?; } + _ = teardown.changed().fuse() => {} } - update_user_contacts(session.user_id, &session).await?; Ok(()) }