From d74fb97158e5e886c1aab3c2b8aa45df35641661 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 6 Dec 2022 16:45:09 +0100 Subject: [PATCH 1/7] Remove `Executor` trait from `collab` and use an enum instead This will let us save off the executor and avoid using generics. --- crates/collab/src/executor.rs | 36 ++++++++++++++++++++++ crates/collab/src/integration_tests.rs | 23 ++++----------- crates/collab/src/lib.rs | 1 + crates/collab/src/rpc.rs | 41 ++++---------------------- 4 files changed, 48 insertions(+), 53 deletions(-) create mode 100644 crates/collab/src/executor.rs diff --git a/crates/collab/src/executor.rs b/crates/collab/src/executor.rs new file mode 100644 index 0000000000000000000000000000000000000000..d2253f8ccb5268fd6dbf369dafb8e632cabf2e42 --- /dev/null +++ b/crates/collab/src/executor.rs @@ -0,0 +1,36 @@ +use std::{future::Future, time::Duration}; + +#[derive(Clone)] +pub enum Executor { + Production, + #[cfg(test)] + Deterministic(std::sync::Arc), +} + +impl Executor { + pub fn spawn_detached(&self, future: F) + where + F: 'static + Send + Future, + { + match self { + Executor::Production => { + tokio::spawn(future); + } + #[cfg(test)] + Executor::Deterministic(background) => { + background.spawn(future).detach(); + } + } + } + + pub fn sleep(&self, duration: Duration) -> impl Future { + let this = self.clone(); + async move { + match this { + Executor::Production => tokio::time::sleep(duration).await, + #[cfg(test)] + Executor::Deterministic(background) => background.timer(duration).await, + } + } + } +} diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index a77ae4925d0d0f4fc5af80225385eb59ef2c9af5..96fed5887b04eb4056a30946709b9103e4c6b9aa 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,9 +1,9 @@ use crate::{ db::{self, NewUserParams, TestDb, UserId}, - rpc::{Executor, Server}, + executor::Executor, + rpc::Server, AppState, }; - use ::rpc::Peer; use anyhow::anyhow; use call::{room, ActiveCall, ParticipantLocation, Room}; @@ -17,7 +17,7 @@ use editor::{ ToggleCodeActions, Undo, }; use fs::{FakeFs, Fs as _, HomeDir, LineEnding}; -use futures::{channel::oneshot, Future, StreamExt as _}; +use futures::{channel::oneshot, StreamExt as _}; use gpui::{ executor::{self, Deterministic}, geometry::vector::vec2f, @@ -45,7 +45,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, }, - time::Duration, }; use theme::ThemeRegistry; use unindent::Unindent as _; @@ -417,7 +416,7 @@ async fn test_leaving_room_on_disconnection( // When user A disconnects, both client A and B clear their room on the active call. server.disconnect_client(client_a.peer_id().unwrap()); - cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( @@ -6000,7 +5999,7 @@ impl TestServer { client_name, user, Some(connection_id_tx), - cx.background(), + Executor::Deterministic(cx.background()), )) .detach(); let connection_id = connection_id_rx.await.unwrap(); @@ -6829,18 +6828,6 @@ impl Drop for TestClient { } } -impl Executor for Arc { - type Sleep = gpui::executor::Timer; - - fn spawn_detached>(&self, future: F) { - self.spawn(future).detach(); - } - - fn sleep(&self, duration: Duration) -> Self::Sleep { - self.as_ref().timer(duration) - } -} - #[derive(Debug, Eq, PartialEq)] struct RoomParticipants { remote: Vec, diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 24a9fc6117ce81ea493b742c2c6f7cbd6e8ca5d4..b9d43cd2eef9542ca65c02ed4681c465a7731175 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod auth; pub mod db; pub mod env; +mod executor; #[cfg(test)] mod integration_tests; pub mod rpc; diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 736f5eb31bcd1366e712a31da937fed25edd5b1b..c1f9eb039b7779aa1d275b9cc8f16287a0804473 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -3,6 +3,7 @@ mod connection_pool; use crate::{ auth, db::{self, Database, ProjectId, RoomId, User, UserId}, + executor::Executor, AppState, Result, }; use anyhow::anyhow; @@ -50,12 +51,8 @@ use std::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, }, - time::Duration, -}; -use tokio::{ - sync::{Mutex, MutexGuard}, - time::Sleep, }; +use tokio::sync::{Mutex, MutexGuard}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -145,15 +142,6 @@ pub struct Server { handlers: HashMap, } -pub trait Executor: Send + Clone { - type Sleep: Send + Future; - fn spawn_detached>(&self, future: F); - fn sleep(&self, duration: Duration) -> Self::Sleep; -} - -#[derive(Clone)] -pub struct RealExecutor; - pub(crate) struct ConnectionPoolGuard<'a> { guard: MutexGuard<'a, ConnectionPool>, _not_send: PhantomData>, @@ -330,13 +318,13 @@ impl Server { }) } - pub fn handle_connection( + pub fn handle_connection( self: &Arc, connection: Connection, address: String, user: User, mut send_connection_id: Option>, - executor: E, + executor: Executor, ) -> impl Future> { let this = self.clone(); let user_id = user.id; @@ -347,12 +335,7 @@ impl Server { .peer .add_connection(connection, { let executor = executor.clone(); - move |duration| { - let timer = executor.sleep(duration); - async move { - timer.await; - } - } + move |duration| executor.sleep(duration) }); tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); @@ -543,18 +526,6 @@ impl<'a> Drop for ConnectionPoolGuard<'a> { } } -impl Executor for RealExecutor { - type Sleep = Sleep; - - fn spawn_detached>(&self, future: F) { - tokio::task::spawn(future); - } - - fn sleep(&self, duration: Duration) -> Self::Sleep { - tokio::time::sleep(duration) - } -} - fn broadcast( sender_id: ConnectionId, receiver_ids: impl IntoIterator, @@ -636,7 +607,7 @@ pub async fn handle_websocket_request( let connection = Connection::new(Box::pin(socket)); async move { server - .handle_connection(connection, socket_address, user, None, RealExecutor) + .handle_connection(connection, socket_address, user, None, Executor::Production) .await .log_err(); } From aca3f025906a3236682534ebbbf0c50b9a26cefa Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 8 Dec 2022 12:14:12 +0100 Subject: [PATCH 2/7] Re-join room when client temporarily loses connection --- crates/call/src/participant.rs | 10 +- crates/call/src/room.rs | 114 ++++++++++++--- .../20221109000000_test_schema.sql | 1 + ...d_connection_lost_to_room_participants.sql | 2 + crates/collab/src/db.rs | 64 ++++++++- crates/collab/src/db/room_participant.rs | 1 + crates/collab/src/integration_tests.rs | 30 +++- crates/collab/src/rpc.rs | 135 +++++++++--------- 8 files changed, 266 insertions(+), 91 deletions(-) create mode 100644 crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql diff --git a/crates/call/src/participant.rs b/crates/call/src/participant.rs index dfa456f7345d06a154e64e0d013abe8205405d0c..d5c6d85154bd796c6624a42e938b4501eaa3f26c 100644 --- a/crates/call/src/participant.rs +++ b/crates/call/src/participant.rs @@ -4,7 +4,7 @@ use collections::HashMap; use gpui::WeakModelHandle; pub use live_kit_client::Frame; use project::Project; -use std::sync::Arc; +use std::{fmt, sync::Arc}; #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ParticipantLocation { @@ -36,7 +36,7 @@ pub struct LocalParticipant { pub active_project: Option>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RemoteParticipant { pub user: Arc, pub projects: Vec, @@ -49,6 +49,12 @@ pub struct RemoteVideoTrack { pub(crate) live_kit_track: Arc, } +impl fmt::Debug for RemoteVideoTrack { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemoteVideoTrack").finish() + } +} + impl RemoteVideoTrack { pub fn frames(&self) -> async_broadcast::Receiver { self.live_kit_track.frames() diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index f8a55a3a931a9d349cb4c1a38db753d9e92846cd..828885e9bdc2a6faf2d86235d0ed122d9aba54b9 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -5,14 +5,18 @@ use crate::{ use anyhow::{anyhow, Result}; use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; use collections::{BTreeMap, HashSet}; -use futures::StreamExt; -use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task}; +use futures::{FutureExt, StreamExt}; +use gpui::{ + AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle, +}; use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; use postage::stream::Stream; use project::Project; -use std::{mem, sync::Arc}; +use std::{mem, sync::Arc, time::Duration}; use util::{post_inc, ResultExt}; +pub const RECONNECTION_TIMEOUT: Duration = client::RECEIVE_TIMEOUT; + #[derive(Clone, Debug, PartialEq, Eq)] pub enum Event { ParticipantLocationChanged { @@ -46,6 +50,7 @@ pub struct Room { user_store: ModelHandle, subscriptions: Vec, pending_room_update: Option>, + _maintain_connection: Task>, } impl Entity for Room { @@ -66,21 +71,6 @@ impl Room { user_store: ModelHandle, cx: &mut ModelContext, ) -> Self { - let mut client_status = client.status(); - cx.spawn_weak(|this, mut cx| async move { - let is_connected = client_status - .next() - .await - .map_or(false, |s| s.is_connected()); - // Even if we're initially connected, any future change of the status means we momentarily disconnected. - if !is_connected || client_status.next().await.is_some() { - if let Some(this) = this.upgrade(&cx) { - let _ = this.update(&mut cx, |this, cx| this.leave(cx)); - } - } - }) - .detach(); - let live_kit_room = if let Some(connection_info) = live_kit_connection_info { let room = live_kit_client::Room::new(); let mut status = room.status(); @@ -131,6 +121,9 @@ impl Room { None }; + let _maintain_connection = + cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx)); + Self { id, live_kit: live_kit_room, @@ -145,6 +138,7 @@ impl Room { pending_room_update: None, client, user_store, + _maintain_connection, } } @@ -245,6 +239,83 @@ impl Room { Ok(()) } + async fn maintain_connection( + this: WeakModelHandle, + client: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let mut client_status = client.status(); + loop { + let is_connected = client_status + .next() + .await + .map_or(false, |s| s.is_connected()); + // Even if we're initially connected, any future change of the status means we momentarily disconnected. + if !is_connected || client_status.next().await.is_some() { + let room_id = this + .upgrade(&cx) + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + this.status = RoomStatus::Rejoining; + cx.notify(); + this.id + }); + + // Wait for client to re-establish a connection to the server. + let mut reconnection_timeout = cx.background().timer(RECONNECTION_TIMEOUT).fuse(); + let client_reconnection = async { + loop { + if let Some(status) = client_status.next().await { + if status.is_connected() { + return true; + } + } else { + return false; + } + } + } + .fuse(); + futures::pin_mut!(client_reconnection); + + futures::select_biased! { + reconnected = client_reconnection => { + if reconnected { + // Client managed to reconnect to the server. Now attempt to join the room. + let rejoin_room = async { + let response = client.request(proto::JoinRoom { id: room_id }).await?; + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + this.upgrade(&cx) + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + this.status = RoomStatus::Online; + this.apply_room_update(room_proto, cx) + })?; + anyhow::Ok(()) + }; + + // If we successfully joined the room, go back around the loop + // waiting for future connection status changes. + if rejoin_room.await.log_err().is_some() { + continue; + } + } + } + _ = reconnection_timeout => {} + } + + // The client failed to re-establish a connection to the server + // or an error occurred while trying to re-join the room. Either way + // we leave the room and return an error. + if let Some(this) = this.upgrade(&cx) { + let _ = this.update(&mut cx, |this, cx| this.leave(cx)); + } + return Err(anyhow!( + "can't reconnect to room: client failed to re-establish connection" + )); + } + } + } + pub fn id(&self) -> u64 { self.id } @@ -325,9 +396,11 @@ impl Room { } if let Some(participants) = remote_participants.log_err() { + let mut participant_peer_ids = HashSet::default(); for (participant, user) in room.participants.into_iter().zip(participants) { let peer_id = PeerId(participant.peer_id); this.participant_user_ids.insert(participant.user_id); + participant_peer_ids.insert(peer_id); let old_projects = this .remote_participants @@ -394,8 +467,8 @@ impl Room { } } - this.remote_participants.retain(|_, participant| { - if this.participant_user_ids.contains(&participant.user.id) { + this.remote_participants.retain(|peer_id, participant| { + if participant_peer_ids.contains(peer_id) { true } else { for project in &participant.projects { @@ -751,6 +824,7 @@ impl Default for ScreenTrack { #[derive(Copy, Clone, PartialEq, Eq)] pub enum RoomStatus { Online, + Rejoining, Offline, } diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 68caf4fad7f12a842e5dac0e9fde5df980864c1d..4eba8d230207da09f4f83d251b41798e322c3771 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -118,6 +118,7 @@ CREATE TABLE "room_participants" ( "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, "answering_connection_epoch" TEXT, + "connection_lost" BOOLEAN NOT NULL, "location_kind" INTEGER, "location_project_id" INTEGER, "initial_project_id" INTEGER, diff --git a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql new file mode 100644 index 0000000000000000000000000000000000000000..d49eda41b897add012909e3434b4e7bffa1b4315 --- /dev/null +++ b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql @@ -0,0 +1,2 @@ +ALTER TABLE "room_participants" + ADD "connection_lost" BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index aae4d9296404ef6c2c167e163c3d02178e37d923..063d82f9325bce677505b5b66d629a9a49b62793 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1034,6 +1034,7 @@ impl Database { user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), answering_connection_epoch: ActiveValue::set(Some(self.epoch)), + connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(user_id), calling_connection_id: ActiveValue::set(connection_id.0 as i32), calling_connection_epoch: ActiveValue::set(self.epoch), @@ -1060,6 +1061,7 @@ impl Database { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(called_user_id), + connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(calling_user_id), calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), calling_connection_epoch: ActiveValue::set(self.epoch), @@ -1175,11 +1177,16 @@ impl Database { room_participant::Column::RoomId .eq(room_id) .and(room_participant::Column::UserId.eq(user_id)) - .and(room_participant::Column::AnsweringConnectionId.is_null()), + .and( + room_participant::Column::AnsweringConnectionId + .is_null() + .or(room_participant::Column::ConnectionLost.eq(true)), + ), ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), answering_connection_epoch: ActiveValue::set(Some(self.epoch)), + connection_lost: ActiveValue::set(false), ..Default::default() }) .exec(&*tx) @@ -1367,6 +1374,61 @@ impl Database { .await } + pub async fn connection_lost( + &self, + connection_id: ConnectionId, + ) -> Result>> { + self.room_transaction(|tx| async move { + let participant = room_participant::Entity::find() + .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .one(&*tx) + .await? + .ok_or_else(|| anyhow!("not a participant in any room"))?; + let room_id = participant.room_id; + + room_participant::Entity::update(room_participant::ActiveModel { + connection_lost: ActiveValue::set(true), + ..participant.into_active_model() + }) + .exec(&*tx) + .await?; + + let collaborator_on_projects = project_collaborator::Entity::find() + .find_also_related(project::Entity) + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .all(&*tx) + .await?; + project_collaborator::Entity::delete_many() + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0)) + .exec(&*tx) + .await?; + + let mut left_projects = Vec::new(); + for (_, project) in collaborator_on_projects { + if let Some(project) = project { + let collaborators = project + .find_related(project_collaborator::Entity) + .all(&*tx) + .await?; + let connection_ids = collaborators + .into_iter() + .map(|collaborator| ConnectionId(collaborator.connection_id as u32)) + .collect(); + + left_projects.push(LeftProject { + id: project.id, + host_user_id: project.host_user_id, + host_connection_id: ConnectionId(project.host_connection_id as u32), + connection_ids, + }); + } + } + + Ok((room_id, left_projects)) + }) + .await + } + fn build_incoming_call( room: &proto::Room, called_user_id: UserId, diff --git a/crates/collab/src/db/room_participant.rs b/crates/collab/src/db/room_participant.rs index 783f45aa93e1952be3f5dd2f5efd0d51da6665cd..3ab3fbbddad57c0af5e43afed65680172ab8429b 100644 --- a/crates/collab/src/db/room_participant.rs +++ b/crates/collab/src/db/room_participant.rs @@ -10,6 +10,7 @@ pub struct Model { pub user_id: UserId, pub answering_connection_id: Option, pub answering_connection_epoch: Option, + pub connection_lost: bool, pub location_kind: Option, pub location_project_id: Option, pub initial_project_id: Option, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 96fed5887b04eb4056a30946709b9103e4c6b9aa..f31022afc4d6b1956f6a1d7ebeca9eeeaf416aec 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -365,7 +365,7 @@ async fn test_room_uniqueness( } #[gpui::test(iterations = 10)] -async fn test_leaving_room_on_disconnection( +async fn test_disconnecting_from_room( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, @@ -414,9 +414,30 @@ async fn test_leaving_room_on_disconnection( } ); - // When user A disconnects, both client A and B clear their room on the active call. + // User A automatically reconnects to the room upon disconnection. server.disconnect_client(client_a.peer_id().unwrap()); deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: Default::default() + } + ); + + // When user A disconnects, both client A and B clear their room on the active call. + server.forbid_connections(); + server.disconnect_client(client_a.peer_id().unwrap()); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.run_until_parked(); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( @@ -434,6 +455,11 @@ async fn test_leaving_room_on_disconnection( } ); + // Allow user A to reconnect to the server. + server.allow_connections(); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.run_until_parked(); + // Call user B again from client A. active_call_a .update(cx_a, |call, cx| { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index c1f9eb039b7779aa1d275b9cc8f16287a0804473..3f70043bfb30c8ee717eaba49a82e690e6624aa0 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -51,11 +51,14 @@ use std::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, }, + time::Duration, }; use tokio::sync::{Mutex, MutexGuard}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; +pub const RECONNECTION_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT; + lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = register_int_gauge!("connections", "number of connections").unwrap(); @@ -435,7 +438,7 @@ impl Server { drop(foreground_message_handlers); tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); - if let Err(error) = sign_out(session).await { + if let Err(error) = sign_out(session, executor).await { tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); } @@ -636,29 +639,38 @@ pub async fn handle_metrics(Extension(server): Extension>) -> Result Ok(encoded_metrics) } -#[instrument(err)] -async fn sign_out(session: Session) -> Result<()> { +#[instrument(err, skip(executor))] +async fn sign_out(session: Session, executor: Executor) -> Result<()> { session.peer.disconnect(session.connection_id); - let decline_calls = { - let mut pool = session.connection_pool().await; - pool.remove_connection(session.connection_id)?; - let mut connections = pool.user_connection_ids(session.user_id); - connections.next().is_none() - }; + session + .connection_pool() + .await + .remove_connection(session.connection_id)?; + + if let Ok(mut left_projects) = session + .db() + .await + .connection_lost(session.connection_id) + .await + { + for left_project in mem::take(&mut *left_projects) { + project_left(&left_project, &session); + } + } + executor.sleep(RECONNECTION_TIMEOUT).await; leave_room_for_session(&session).await.trace_err(); - if decline_calls { - if let Some(room) = session - .db() - .await - .decline_call(None, session.user_id) - .await - .trace_err() - { + + if !session + .connection_pool() + .await + .is_user_online(session.user_id) + { + let db = session.db().await; + if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { room_updated(&room, &session); } } - update_user_contacts(session.user_id, &session).await?; Ok(()) @@ -1089,20 +1101,7 @@ async fn leave_project(request: proto::LeaveProject, session: Session) -> Result host_connection_id = %project.host_connection_id, "leave project" ); - - broadcast( - sender_id, - project.connection_ids.iter().copied(), - |conn_id| { - session.peer.send( - conn_id, - proto::RemoveProjectCollaborator { - project_id: project_id.to_proto(), - peer_id: sender_id.0, - }, - ) - }, - ); + project_left(&project, &session); Ok(()) } @@ -1833,40 +1832,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { contacts_to_update.insert(session.user_id); for project in left_room.left_projects.values() { - for connection_id in &project.connection_ids { - if project.host_user_id == session.user_id { - session - .peer - .send( - *connection_id, - proto::UnshareProject { - project_id: project.id.to_proto(), - }, - ) - .trace_err(); - } else { - session - .peer - .send( - *connection_id, - proto::RemoveProjectCollaborator { - project_id: project.id.to_proto(), - peer_id: session.connection_id.0, - }, - ) - .trace_err(); - } - } - - session - .peer - .send( - session.connection_id, - proto::UnshareProject { - project_id: project.id.to_proto(), - }, - ) - .trace_err(); + project_left(project, session); } room_updated(&left_room.room, &session); @@ -1906,6 +1872,43 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { Ok(()) } +fn project_left(project: &db::LeftProject, session: &Session) { + for connection_id in &project.connection_ids { + if project.host_user_id == session.user_id { + session + .peer + .send( + *connection_id, + proto::UnshareProject { + project_id: project.id.to_proto(), + }, + ) + .trace_err(); + } else { + session + .peer + .send( + *connection_id, + proto::RemoveProjectCollaborator { + project_id: project.id.to_proto(), + peer_id: session.connection_id.0, + }, + ) + .trace_err(); + } + } + + session + .peer + .send( + session.connection_id, + proto::UnshareProject { + project_id: project.id.to_proto(), + }, + ) + .trace_err(); +} + pub trait ResultExt { type Ok; From 8fa26bfe18789fb02889b8a63470a2ae88656381 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 9 Dec 2022 08:58:18 +0100 Subject: [PATCH 3/7] Fix `test_calls_on_multiple_connections` after adding room reconnection --- crates/collab/src/integration_tests.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index f31022afc4d6b1956f6a1d7ebeca9eeeaf416aec..a2639d7c5823a979819bc6257c9eb41f5ffe3025 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -437,7 +437,6 @@ async fn test_disconnecting_from_room( server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); - deterministic.run_until_parked(); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( @@ -458,7 +457,6 @@ async fn test_disconnecting_from_room( // Allow user A to reconnect to the server. server.allow_connections(); deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); - deterministic.run_until_parked(); // Call user B again from client A. active_call_a @@ -642,12 +640,15 @@ async fn test_calls_on_multiple_connections( assert!(incoming_call_b2.next().await.unwrap().is_some()); // User A disconnects, causing both connections to stop ringing. + server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); // User A reconnects automatically, then calls user B again. + server.allow_connections(); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); active_call_a .update(cx_a, |call, cx| { call.invite(client_b1.user_id().unwrap(), None, cx) @@ -662,7 +663,7 @@ async fn test_calls_on_multiple_connections( server.forbid_connections(); server.disconnect_client(client_b1.peer_id().unwrap()); server.disconnect_client(client_b2.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); } From 895c36548514c8d30631fc566587a8d48981fb50 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 9 Dec 2022 11:20:22 +0100 Subject: [PATCH 4/7] Introduce random reconnections in the randomized test --- crates/call/src/room.rs | 2 + .../20221109000000_test_schema.sql | 14 ++-- crates/collab/src/integration_tests.rs | 82 +++++++++++++------ crates/collab/src/rpc.rs | 45 ++++++---- 4 files changed, 97 insertions(+), 46 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 828885e9bdc2a6faf2d86235d0ed122d9aba54b9..824ec49054c8f1721a834cc0de46e2426636dc7f 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -550,10 +550,12 @@ impl Room { { for participant in self.remote_participants.values() { assert!(self.participant_user_ids.contains(&participant.user.id)); + assert_ne!(participant.user.id, self.client.user_id().unwrap()); } for participant in &self.pending_participants { assert!(self.participant_user_ids.contains(&participant.id)); + assert_ne!(participant.id, self.client.user_id().unwrap()); } assert_eq!( diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 4eba8d230207da09f4f83d251b41798e322c3771..9f03541f44a539a6064914f1264f62f5b7ec9149 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -1,5 +1,5 @@ CREATE TABLE "users" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "github_login" VARCHAR, "admin" BOOLEAN, "email_address" VARCHAR(255) DEFAULT NULL, @@ -17,14 +17,14 @@ CREATE INDEX "index_users_on_email_address" ON "users" ("email_address"); CREATE INDEX "index_users_on_github_user_id" ON "users" ("github_user_id"); CREATE TABLE "access_tokens" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "user_id" INTEGER REFERENCES users (id), "hash" VARCHAR(128) ); CREATE INDEX "index_access_tokens_user_id" ON "access_tokens" ("user_id"); CREATE TABLE "contacts" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "user_id_a" INTEGER REFERENCES users (id) NOT NULL, "user_id_b" INTEGER REFERENCES users (id) NOT NULL, "a_to_b" BOOLEAN NOT NULL, @@ -35,12 +35,12 @@ CREATE UNIQUE INDEX "index_contacts_user_ids" ON "contacts" ("user_id_a", "user_ CREATE INDEX "index_contacts_user_id_b" ON "contacts" ("user_id_b"); CREATE TABLE "rooms" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "live_kit_room" VARCHAR NOT NULL ); CREATE TABLE "projects" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "room_id" INTEGER REFERENCES rooms (id) NOT NULL, "host_user_id" INTEGER REFERENCES users (id) NOT NULL, "host_connection_id" INTEGER NOT NULL, @@ -100,7 +100,7 @@ CREATE TABLE "language_servers" ( CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id"); CREATE TABLE "project_collaborators" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, "connection_epoch" TEXT NOT NULL, @@ -113,7 +113,7 @@ CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" O CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch"); CREATE TABLE "room_participants" ( - "id" INTEGER PRIMARY KEY, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index a2639d7c5823a979819bc6257c9eb41f5ffe3025..aca5f77fe98bde707e042f7756ebebcbf3070b73 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,7 +1,7 @@ use crate::{ db::{self, NewUserParams, TestDb, UserId}, executor::Executor, - rpc::Server, + rpc::{Server, RECONNECT_TIMEOUT}, AppState, }; use ::rpc::Peer; @@ -416,7 +416,7 @@ async fn test_disconnecting_from_room( // User A automatically reconnects to the room upon disconnection. server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -436,7 +436,7 @@ async fn test_disconnecting_from_room( // When user A disconnects, both client A and B clear their room on the active call. server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( @@ -456,7 +456,7 @@ async fn test_disconnecting_from_room( // Allow user A to reconnect to the server. server.allow_connections(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); // Call user B again from client A. active_call_a @@ -581,7 +581,7 @@ async fn test_calls_on_multiple_connections( // User B disconnects the client that is not on the call. Everything should be fine. client_b1.disconnect(&cx_b1.to_async()).unwrap(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); client_b1 .authenticate_and_connect(false, &cx_b1.to_async()) .await @@ -642,13 +642,13 @@ async fn test_calls_on_multiple_connections( // User A disconnects, causing both connections to stop ringing. server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); // User A reconnects automatically, then calls user B again. server.allow_connections(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); active_call_a .update(cx_a, |call, cx| { call.invite(client_b1.user_id().unwrap(), None, cx) @@ -663,7 +663,7 @@ async fn test_calls_on_multiple_connections( server.forbid_connections(); server.disconnect_client(client_b1.peer_id().unwrap()); server.disconnect_client(client_b2.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT + crate::rpc::RECONNECTION_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); } @@ -953,8 +953,9 @@ async fn test_host_disconnect( assert!(cx_b.is_window_edited(workspace_b.window_id())); // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared. + server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); project_a .condition(cx_a, |project, _| project.collaborators().is_empty()) .await; @@ -977,6 +978,11 @@ async fn test_host_disconnect( .unwrap(); assert!(can_close); + // Allow client A to reconnect to the server. + server.allow_connections(); + deterministic.advance_clock(RECEIVE_TIMEOUT); + + // Client B calls client A again after they reconnected. let active_call_b = cx_b.read(ActiveCall::global); active_call_b .update(cx_b, |call, cx| { @@ -997,7 +1003,7 @@ async fn test_host_disconnect( // Drop client A's connection again. We should still unshare it successfully. server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); } @@ -2323,7 +2329,7 @@ async fn test_leaving_project( // Simulate connection loss for client C and ensure client A observes client C leaving the project. client_c.wait_for_current_user(cx_c).await; server.disconnect_client(client_c.peer_id().unwrap()); - cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + cx_a.foreground().advance_clock(RECEIVE_TIMEOUT); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| { assert_eq!(project.collaborators().len(), 0); @@ -4256,7 +4262,7 @@ async fn test_contacts( server.disconnect_client(client_c.peer_id().unwrap()); server.forbid_connections(); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); assert_eq!( contacts(&client_a, cx_a), [ @@ -4560,7 +4566,7 @@ async fn test_contacts( server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); - deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); assert_eq!(contacts(&client_a, cx_a), []); assert_eq!( contacts(&client_b, cx_b), @@ -5656,7 +5662,6 @@ async fn test_random_collaboration( let mut clients = Vec::new(); let mut user_ids = Vec::new(); - let mut peer_ids = Vec::new(); let mut op_start_signals = Vec::new(); let mut next_entity_id = 100000; @@ -5683,7 +5688,6 @@ async fn test_random_collaboration( let op_start_signal = futures::channel::mpsc::unbounded(); let guest = server.create_client(&mut guest_cx, &guest_username).await; user_ids.push(guest.current_user_id(&guest_cx)); - peer_ids.push(guest.peer_id().unwrap()); op_start_signals.push(op_start_signal.0); clients.push(guest_cx.foreground().spawn(guest.simulate( guest_username.clone(), @@ -5695,16 +5699,26 @@ async fn test_random_collaboration( log::info!("Added connection for {}", guest_username); operations += 1; } - 20..=29 if clients.len() > 1 => { + 20..=24 if clients.len() > 1 => { let guest_ix = rng.lock().gen_range(1..clients.len()); - log::info!("Removing guest {}", user_ids[guest_ix]); + log::info!( + "Simulating full disconnection of guest {}", + user_ids[guest_ix] + ); let removed_guest_id = user_ids.remove(guest_ix); - let removed_peer_id = peer_ids.remove(guest_ix); + let user_connection_ids = server + .connection_pool + .lock() + .await + .user_connection_ids(removed_guest_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let removed_peer_id = PeerId(user_connection_ids[0].0); let guest = clients.remove(guest_ix); op_start_signals.remove(guest_ix); server.forbid_connections(); server.disconnect_client(removed_peer_id); - deterministic.advance_clock(RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); deterministic.start_waiting(); log::info!("Waiting for guest {} to exit...", removed_guest_id); let (guest, mut guest_cx) = guest.await; @@ -5738,6 +5752,22 @@ async fn test_random_collaboration( operations += 1; } + 25..=29 if clients.len() > 1 => { + let guest_ix = rng.lock().gen_range(1..clients.len()); + let user_id = user_ids[guest_ix]; + log::info!("Simulating temporary disconnection of guest {}", user_id); + let user_connection_ids = server + .connection_pool + .lock() + .await + .user_connection_ids(user_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let peer_id = PeerId(user_connection_ids[0].0); + server.disconnect_client(peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + operations += 1; + } _ if !op_start_signals.is_empty() => { while operations < max_operations && rng.lock().gen_bool(0.7) { op_start_signals @@ -6163,6 +6193,7 @@ impl Deref for TestServer { impl Drop for TestServer { fn drop(&mut self) { self.peer.reset(); + self.server.teardown(); self.test_live_kit_server.teardown().unwrap(); } } @@ -6423,11 +6454,14 @@ impl TestClient { .clone() } }; - if let Err(error) = active_call - .update(cx, |call, cx| call.share_project(project.clone(), cx)) - .await - { - log::error!("{}: error sharing project, {:?}", username, error); + + if active_call.read_with(cx, |call, _| call.room().is_some()) { + if let Err(error) = active_call + .update(cx, |call, cx| call.share_project(project.clone(), cx)) + .await + { + log::error!("{}: error sharing project, {:?}", username, error); + } } let buffers = client.buffers.entry(project.clone()).or_default(); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 3f70043bfb30c8ee717eaba49a82e690e6624aa0..e1d318fd3ee4ed8564b14575b3ecd8b7424c2507 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -53,11 +53,11 @@ use std::{ }, time::Duration, }; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::{watch, Mutex, MutexGuard}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; -pub const RECONNECTION_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT; +pub const RECONNECT_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT; lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = @@ -143,6 +143,7 @@ pub struct Server { pub(crate) connection_pool: Arc>, app_state: Arc, handlers: HashMap, + teardown: watch::Sender<()>, } pub(crate) struct ConnectionPoolGuard<'a> { @@ -173,6 +174,7 @@ impl Server { app_state, connection_pool: Default::default(), handlers: Default::default(), + teardown: watch::channel(()).0, }; server @@ -235,6 +237,10 @@ impl Server { Arc::new(server) } + pub fn teardown(&self) { + let _ = self.teardown.send(()); + } + fn add_handler(&mut self, handler: F) -> &mut Self where F: 'static + Send + Sync + Fn(TypedEnvelope, Session) -> Fut, @@ -333,6 +339,7 @@ impl Server { let user_id = user.id; let login = user.github_login; let span = info_span!("handle connection", %user_id, %login, %address); + let teardown = self.teardown.subscribe(); async move { let (connection_id, handle_io, mut incoming_rx) = this .peer @@ -438,7 +445,7 @@ impl Server { drop(foreground_message_handlers); tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); - if let Err(error) = sign_out(session, executor).await { + if let Err(error) = sign_out(session, teardown, executor).await { tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); } @@ -640,7 +647,11 @@ pub async fn handle_metrics(Extension(server): Extension>) -> Result } #[instrument(err, skip(executor))] -async fn sign_out(session: Session, executor: Executor) -> Result<()> { +async fn sign_out( + session: Session, + mut teardown: watch::Receiver<()>, + executor: Executor, +) -> Result<()> { session.peer.disconnect(session.connection_id); session .connection_pool() @@ -658,20 +669,24 @@ async fn sign_out(session: Session, executor: Executor) -> Result<()> { } } - executor.sleep(RECONNECTION_TIMEOUT).await; - leave_room_for_session(&session).await.trace_err(); + futures::select_biased! { + _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => { + leave_room_for_session(&session).await.trace_err(); - if !session - .connection_pool() - .await - .is_user_online(session.user_id) - { - let db = session.db().await; - if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { - room_updated(&room, &session); + if !session + .connection_pool() + .await + .is_user_online(session.user_id) + { + let db = session.db().await; + if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { + room_updated(&room, &session); + } + } + update_user_contacts(session.user_id, &session).await?; } + _ = teardown.changed().fuse() => {} } - update_user_contacts(session.user_id, &session).await?; Ok(()) } From 26b565342725fee00868aba748bb5c75602eb706 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 9 Dec 2022 12:06:12 +0100 Subject: [PATCH 5/7] Delete hosted projects from database when connection is lost --- crates/collab/src/db.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 063d82f9325bce677505b5b66d629a9a49b62793..5ab2b1b82438d691988cdd4a0fe9f6123f53bca1 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1424,6 +1424,11 @@ impl Database { } } + project::Entity::delete_many() + .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .exec(&*tx) + .await?; + Ok((room_id, left_projects)) }) .await From 456396ca6e04de73e2d5d6e4748cc8f3b1754f22 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 9 Dec 2022 14:08:40 +0100 Subject: [PATCH 6/7] Rename `connection_lost` to `answering_connection_lost` --- .../migrations.sqlite/20221109000000_test_schema.sql | 2 +- ...165001_add_connection_lost_to_room_participants.sql | 2 +- crates/collab/src/db.rs | 10 +++++----- crates/collab/src/db/room_participant.rs | 2 +- crates/collab/src/rpc.rs | 3 ++- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 9f03541f44a539a6064914f1264f62f5b7ec9149..d1bb7b8f655014ad4b0ca776e665c0cd09ebc2aa 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -118,7 +118,7 @@ CREATE TABLE "room_participants" ( "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, "answering_connection_epoch" TEXT, - "connection_lost" BOOLEAN NOT NULL, + "answering_connection_lost" BOOLEAN NOT NULL, "location_kind" INTEGER, "location_project_id" INTEGER, "initial_project_id" INTEGER, diff --git a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql index d49eda41b897add012909e3434b4e7bffa1b4315..2f4f38407c06f5567d8ce547d8ea8de3b76a0ac3 100644 --- a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql +++ b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql @@ -1,2 +1,2 @@ ALTER TABLE "room_participants" - ADD "connection_lost" BOOLEAN NOT NULL DEFAULT FALSE; + ADD "answering_connection_lost" BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 5ab2b1b82438d691988cdd4a0fe9f6123f53bca1..c21ef7026c87794bf9950d8843b91d34e49d9956 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1034,7 +1034,7 @@ impl Database { user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), answering_connection_epoch: ActiveValue::set(Some(self.epoch)), - connection_lost: ActiveValue::set(false), + answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(user_id), calling_connection_id: ActiveValue::set(connection_id.0 as i32), calling_connection_epoch: ActiveValue::set(self.epoch), @@ -1061,7 +1061,7 @@ impl Database { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(called_user_id), - connection_lost: ActiveValue::set(false), + answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(calling_user_id), calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), calling_connection_epoch: ActiveValue::set(self.epoch), @@ -1180,13 +1180,13 @@ impl Database { .and( room_participant::Column::AnsweringConnectionId .is_null() - .or(room_participant::Column::ConnectionLost.eq(true)), + .or(room_participant::Column::AnsweringConnectionLost.eq(true)), ), ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), answering_connection_epoch: ActiveValue::set(Some(self.epoch)), - connection_lost: ActiveValue::set(false), + answering_connection_lost: ActiveValue::set(false), ..Default::default() }) .exec(&*tx) @@ -1387,7 +1387,7 @@ impl Database { let room_id = participant.room_id; room_participant::Entity::update(room_participant::ActiveModel { - connection_lost: ActiveValue::set(true), + answering_connection_lost: ActiveValue::set(true), ..participant.into_active_model() }) .exec(&*tx) diff --git a/crates/collab/src/db/room_participant.rs b/crates/collab/src/db/room_participant.rs index 3ab3fbbddad57c0af5e43afed65680172ab8429b..c80c10c1bae25cc1b79a499a4ed5b310f2209527 100644 --- a/crates/collab/src/db/room_participant.rs +++ b/crates/collab/src/db/room_participant.rs @@ -10,7 +10,7 @@ pub struct Model { pub user_id: UserId, pub answering_connection_id: Option, pub answering_connection_epoch: Option, - pub connection_lost: bool, + pub answering_connection_lost: bool, pub location_kind: Option, pub location_project_id: Option, pub initial_project_id: Option, diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e1d318fd3ee4ed8564b14575b3ecd8b7424c2507..a799837ad4fe68f9002f232e7acf69dcf15637bc 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -658,11 +658,12 @@ async fn sign_out( .await .remove_connection(session.connection_id)?; - if let Ok(mut left_projects) = session + if let Some(mut left_projects) = session .db() .await .connection_lost(session.connection_id) .await + .trace_err() { for left_project in mem::take(&mut *left_projects) { project_left(&left_project, &session); From 3cd77bfcc4946c7f760d721992235fddfc5ecab7 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 12 Dec 2022 11:16:27 +0100 Subject: [PATCH 7/7] Always cast connection ids to i32 Postgres doesn't support unsigned types. This also adds indices to support querying `project_collaborators` and `room_participants` by connection id. --- .../20221109000000_test_schema.sql | 4 +++ ...d_connection_lost_to_room_participants.sql | 5 ++++ crates/collab/src/db.rs | 28 +++++++++---------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index d1bb7b8f655014ad4b0ca776e665c0cd09ebc2aa..0d4bcac5ddfba2c8836294e382a84782f4f5a55e 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -111,6 +111,8 @@ CREATE TABLE "project_collaborators" ( CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch"); +CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id"); +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch"); CREATE TABLE "room_participants" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, @@ -129,3 +131,5 @@ CREATE TABLE "room_participants" ( CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch"); CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch"); +CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id"); +CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch"); diff --git a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql index 2f4f38407c06f5567d8ce547d8ea8de3b76a0ac3..ed0cf972bc97f517fb878806b0929e8122b2b8a2 100644 --- a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql +++ b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql @@ -1,2 +1,7 @@ ALTER TABLE "room_participants" ADD "answering_connection_lost" BOOLEAN NOT NULL DEFAULT FALSE; + +CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id"); +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch"); +CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id"); +CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index c21ef7026c87794bf9950d8843b91d34e49d9956..4a920841e8138e027770d3146ad92dc2c8ea4b53 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1204,7 +1204,7 @@ impl Database { pub async fn leave_room(&self, connection_id: ConnectionId) -> Result> { self.room_transaction(|tx| async move { let leaving_participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)) + .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) .one(&*tx) .await?; @@ -1247,7 +1247,7 @@ impl Database { project_collaborator::Column::ProjectId, QueryProjectIds::ProjectId, ) - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0)) + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) .into_values::<_, QueryProjectIds>() .all(&*tx) .await?; @@ -1284,7 +1284,7 @@ impl Database { // Leave projects. project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0)) + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) .exec(&*tx) .await?; @@ -1293,7 +1293,7 @@ impl Database { .filter( project::Column::RoomId .eq(room_id) - .and(project::Column::HostConnectionId.eq(connection_id.0)), + .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)), ) .exec(&*tx) .await?; @@ -1351,11 +1351,9 @@ impl Database { } let result = room_participant::Entity::update_many() - .filter( - room_participant::Column::RoomId - .eq(room_id) - .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)), - ) + .filter(room_participant::Column::RoomId.eq(room_id).and( + room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32), + )) .set(room_participant::ActiveModel { location_kind: ActiveValue::set(Some(location_kind)), location_project_id: ActiveValue::set(location_project_id), @@ -1399,7 +1397,7 @@ impl Database { .all(&*tx) .await?; project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0)) + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) .exec(&*tx) .await?; @@ -1581,7 +1579,7 @@ impl Database { ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)) + .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) .one(&*tx) .await? .ok_or_else(|| anyhow!("could not find participant"))?; @@ -1667,7 +1665,7 @@ impl Database { ) -> Result)>> { self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) - .filter(project::Column::HostConnectionId.eq(connection_id.0)) + .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; @@ -1721,7 +1719,7 @@ impl Database { // Ensure the update comes from the host. let project = project::Entity::find_by_id(project_id) - .filter(project::Column::HostConnectionId.eq(connection_id.0)) + .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; @@ -1904,7 +1902,7 @@ impl Database { ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)) + .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) .one(&*tx) .await? .ok_or_else(|| anyhow!("must join a room first"))?; @@ -2041,7 +2039,7 @@ impl Database { .filter( project_collaborator::Column::ProjectId .eq(project_id) - .and(project_collaborator::Column::ConnectionId.eq(connection_id.0)), + .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)), ) .exec(&*tx) .await?;