@@ -125,11 +125,13 @@ impl Database {
pub async fn delete_stale_projects(
&self,
- new_epoch: ServerId,
environment: &str,
+ new_server_id: ServerId,
) -> Result<()> {
self.transaction(|tx| async move {
- let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
+ let stale_server_epochs = self
+ .stale_server_ids(environment, new_server_id, &tx)
+ .await?;
project_collaborator::Entity::delete_many()
.filter(
project_collaborator::Column::ConnectionServerId
@@ -151,8 +153,8 @@ impl Database {
pub async fn stale_room_ids(
&self,
- new_epoch: ServerId,
environment: &str,
+ new_server_id: ServerId,
) -> Result<Vec<RoomId>> {
self.transaction(|tx| async move {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
@@ -160,7 +162,9 @@ impl Database {
RoomId,
}
- let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
+ let stale_server_epochs = self
+ .stale_server_ids(environment, new_server_id, &tx)
+ .await?;
Ok(room_participant::Entity::find()
.select_only()
.column(room_participant::Column::RoomId)
@@ -179,13 +183,13 @@ impl Database {
pub async fn refresh_room(
&self,
room_id: RoomId,
- new_epoch: ServerId,
+ new_server_id: ServerId,
) -> Result<RoomGuard<RefreshedRoom>> {
self.room_transaction(|tx| async move {
let stale_participant_filter = Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
- .add(room_participant::Column::AnsweringConnectionServerId.ne(new_epoch));
+ .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
let stale_participant_user_ids = room_participant::Entity::find()
.filter(stale_participant_filter.clone())
@@ -229,13 +233,17 @@ impl Database {
.await
}
- pub async fn delete_stale_servers(&self, new_epoch: ServerId, environment: &str) -> Result<()> {
+ pub async fn delete_stale_servers(
+ &self,
+ new_server_id: ServerId,
+ environment: &str,
+ ) -> Result<()> {
self.transaction(|tx| async move {
server::Entity::delete_many()
.filter(
Condition::all()
.add(server::Column::Environment.eq(environment))
- .add(server::Column::Id.ne(new_epoch)),
+ .add(server::Column::Id.ne(new_server_id)),
)
.exec(&*tx)
.await?;
@@ -247,14 +255,14 @@ impl Database {
async fn stale_server_ids(
&self,
environment: &str,
- new_epoch: ServerId,
+ new_server_id: ServerId,
tx: &DatabaseTransaction,
) -> Result<Vec<ServerId>> {
let stale_servers = server::Entity::find()
.filter(
Condition::all()
.add(server::Column::Environment.eq(environment))
- .add(server::Column::Id.ne(new_epoch)),
+ .add(server::Column::Id.ne(new_server_id)),
)
.all(&*tx)
.await?;
@@ -1155,13 +1163,13 @@ impl Database {
user_id: ActiveValue::set(user_id),
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
answering_connection_server_id: ActiveValue::set(Some(ServerId(
- connection.epoch as i32,
+ connection.owner_id as i32,
))),
answering_connection_lost: ActiveValue::set(false),
calling_user_id: ActiveValue::set(user_id),
calling_connection_id: ActiveValue::set(connection.id as i32),
calling_connection_server_id: ActiveValue::set(Some(ServerId(
- connection.epoch as i32,
+ connection.owner_id as i32,
))),
..Default::default()
}
@@ -1190,7 +1198,7 @@ impl Database {
calling_user_id: ActiveValue::set(calling_user_id),
calling_connection_id: ActiveValue::set(calling_connection.id as i32),
calling_connection_server_id: ActiveValue::set(Some(ServerId(
- calling_connection.epoch as i32,
+ calling_connection.owner_id as i32,
))),
initial_project_id: ActiveValue::set(initial_project_id),
..Default::default()
@@ -1274,7 +1282,7 @@ impl Database {
)
.add(
room_participant::Column::CallingConnectionServerId
- .eq(calling_connection.epoch as i32),
+ .eq(calling_connection.owner_id as i32),
)
.add(room_participant::Column::AnsweringConnectionId.is_null()),
)
@@ -1314,14 +1322,14 @@ impl Database {
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
.add(
room_participant::Column::AnsweringConnectionServerId
- .ne(connection.epoch as i32),
+ .ne(connection.owner_id as i32),
),
),
)
.set(room_participant::ActiveModel {
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
answering_connection_server_id: ActiveValue::set(Some(ServerId(
- connection.epoch as i32,
+ connection.owner_id as i32,
))),
answering_connection_lost: ActiveValue::set(false),
..Default::default()
@@ -1349,7 +1357,7 @@ impl Database {
)
.add(
room_participant::Column::AnsweringConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.one(&*tx)
@@ -1372,7 +1380,7 @@ impl Database {
)
.add(
room_participant::Column::CallingConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
)
.add(room_participant::Column::AnsweringConnectionId.is_null()),
)
@@ -1408,7 +1416,7 @@ impl Database {
)
.add(
project_collaborator::Column::ConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.into_values::<_, QueryProjectIds>()
@@ -1432,7 +1440,7 @@ impl Database {
});
let collaborator_connection_id = ConnectionId {
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
if collaborator_connection_id != connection {
@@ -1455,7 +1463,7 @@ impl Database {
)
.add(
project_collaborator::Column::ConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.exec(&*tx)
@@ -1468,7 +1476,8 @@ impl Database {
.add(project::Column::RoomId.eq(room_id))
.add(project::Column::HostConnectionId.eq(connection.id as i32))
.add(
- project::Column::HostConnectionServerId.eq(connection.epoch as i32),
+ project::Column::HostConnectionServerId
+ .eq(connection.owner_id as i32),
),
)
.exec(&*tx)
@@ -1536,7 +1545,7 @@ impl Database {
)
.add(
room_participant::Column::AnsweringConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.set(room_participant::ActiveModel {
@@ -1571,7 +1580,7 @@ impl Database {
)
.add(
room_participant::Column::AnsweringConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.one(&*tx)
@@ -1593,7 +1602,7 @@ impl Database {
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
.add(
project_collaborator::Column::ConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.all(&*tx)
@@ -1604,7 +1613,7 @@ impl Database {
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
.add(
project_collaborator::Column::ConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.exec(&*tx)
@@ -1621,7 +1630,7 @@ impl Database {
.into_iter()
.map(|collaborator| ConnectionId {
id: collaborator.connection_id as u32,
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
})
.collect();
@@ -1630,7 +1639,7 @@ impl Database {
host_user_id: project.host_user_id,
host_connection_id: ConnectionId {
id: project.host_connection_id as u32,
- epoch: project.host_connection_server_id.0 as u32,
+ owner_id: project.host_connection_server_id.0 as u32,
},
connection_ids,
});
@@ -1641,7 +1650,9 @@ impl Database {
.filter(
Condition::all()
.add(project::Column::HostConnectionId.eq(connection.id as i32))
- .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
+ .add(
+ project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
+ ),
)
.exec(&*tx)
.await?;
@@ -1717,7 +1728,7 @@ impl Database {
};
let answering_connection = ConnectionId {
- epoch: answering_connection_server_id.0 as u32,
+ owner_id: answering_connection_server_id.0 as u32,
id: answering_connection_id as u32,
};
participants.insert(
@@ -1748,7 +1759,7 @@ impl Database {
while let Some(row) = db_projects.next().await {
let (db_project, db_worktree) = row?;
let host_connection = ConnectionId {
- epoch: db_project.host_connection_server_id.0 as u32,
+ owner_id: db_project.host_connection_server_id.0 as u32,
id: db_project.host_connection_id as u32,
};
if let Some(participant) = participants.get_mut(&host_connection) {
@@ -1818,7 +1829,7 @@ impl Database {
)
.add(
room_participant::Column::AnsweringConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.one(&*tx)
@@ -1832,7 +1843,7 @@ impl Database {
room_id: ActiveValue::set(participant.room_id),
host_user_id: ActiveValue::set(participant.user_id),
host_connection_id: ActiveValue::set(connection.id as i32),
- host_connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
+ host_connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
..Default::default()
}
.insert(&*tx)
@@ -1857,7 +1868,7 @@ impl Database {
project_collaborator::ActiveModel {
project_id: ActiveValue::set(project.id),
connection_id: ActiveValue::set(connection.id as i32),
- connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
+ connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(ReplicaId(0)),
is_host: ActiveValue::set(true),
@@ -1885,7 +1896,7 @@ impl Database {
.await?
.ok_or_else(|| anyhow!("project not found"))?;
let host_connection = ConnectionId {
- epoch: project.host_connection_server_id.0 as u32,
+ owner_id: project.host_connection_server_id.0 as u32,
id: project.host_connection_id as u32,
};
if host_connection == connection {
@@ -1913,7 +1924,9 @@ impl Database {
.filter(
Condition::all()
.add(project::Column::HostConnectionId.eq(connection.id as i32))
- .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
+ .add(
+ project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
+ ),
)
.one(&*tx)
.await?
@@ -1971,7 +1984,9 @@ impl Database {
.filter(
Condition::all()
.add(project::Column::HostConnectionId.eq(connection.id as i32))
- .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
+ .add(
+ project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
+ ),
)
.one(&*tx)
.await?
@@ -2068,7 +2083,7 @@ impl Database {
.await?
.ok_or_else(|| anyhow!("no such project"))?;
let host_connection = ConnectionId {
- epoch: project.host_connection_server_id.0 as u32,
+ owner_id: project.host_connection_server_id.0 as u32,
id: project.host_connection_id as u32,
};
if host_connection != connection {
@@ -2125,7 +2140,7 @@ impl Database {
.await?
.ok_or_else(|| anyhow!("no such project"))?;
let host_connection = ConnectionId {
- epoch: project.host_connection_server_id.0 as u32,
+ owner_id: project.host_connection_server_id.0 as u32,
id: project.host_connection_id as u32,
};
if host_connection != connection {
@@ -2171,7 +2186,7 @@ impl Database {
)
.add(
room_participant::Column::AnsweringConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.one(&*tx)
@@ -2201,7 +2216,7 @@ impl Database {
let new_collaborator = project_collaborator::ActiveModel {
project_id: ActiveValue::set(project_id),
connection_id: ActiveValue::set(connection.id as i32),
- connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
+ connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(replica_id),
is_host: ActiveValue::set(false),
@@ -2313,7 +2328,7 @@ impl Database {
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
.add(
project_collaborator::Column::ConnectionServerId
- .eq(connection.epoch as i32),
+ .eq(connection.owner_id as i32),
),
)
.exec(&*tx)
@@ -2333,7 +2348,7 @@ impl Database {
let connection_ids = collaborators
.into_iter()
.map(|collaborator| ConnectionId {
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
})
.collect();
@@ -2342,7 +2357,7 @@ impl Database {
id: project_id,
host_user_id: project.host_user_id,
host_connection_id: ConnectionId {
- epoch: project.host_connection_server_id.0 as u32,
+ owner_id: project.host_connection_server_id.0 as u32,
id: project.host_connection_id as u32,
},
connection_ids,
@@ -2369,7 +2384,7 @@ impl Database {
if collaborators.iter().any(|collaborator| {
let collaborator_connection = ConnectionId {
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
collaborator_connection == connection
@@ -2401,7 +2416,7 @@ impl Database {
while let Some(participant) = participants.next().await {
let participant = participant?;
connection_ids.insert(ConnectionId {
- epoch: participant.connection_server_id.0 as u32,
+ owner_id: participant.connection_server_id.0 as u32,
id: participant.connection_id as u32,
});
}
@@ -2433,7 +2448,7 @@ impl Database {
while let Some(participant) = participants.next().await {
let participant = participant?;
guest_connection_ids.push(ConnectionId {
- epoch: participant.connection_server_id.0 as u32,
+ owner_id: participant.connection_server_id.0 as u32,
id: participant.connection_id as u32,
});
}
@@ -410,7 +410,7 @@ test_both_dbs!(
test_project_count_sqlite,
db,
{
- let epoch = db.create_server("test").await.unwrap().0 as u32;
+ let owner_id = db.create_server("test").await.unwrap().0 as u32;
let user1 = db
.create_user(
@@ -438,7 +438,7 @@ test_both_dbs!(
.unwrap();
let room_id = RoomId::from_proto(
- db.create_room(user1.user_id, ConnectionId { epoch, id: 0 }, "")
+ db.create_room(user1.user_id, ConnectionId { owner_id, id: 0 }, "")
.await
.unwrap()
.id,
@@ -446,34 +446,36 @@ test_both_dbs!(
db.call(
room_id,
user1.user_id,
- ConnectionId { epoch, id: 0 },
+ ConnectionId { owner_id, id: 0 },
user2.user_id,
None,
)
.await
.unwrap();
- db.join_room(room_id, user2.user_id, ConnectionId { epoch, id: 1 })
+ db.join_room(room_id, user2.user_id, ConnectionId { owner_id, id: 1 })
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0);
- db.share_project(room_id, ConnectionId { epoch, id: 1 }, &[])
+ db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[])
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 1);
- db.share_project(room_id, ConnectionId { epoch, id: 1 }, &[])
+ db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[])
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2);
// Projects shared by admins aren't counted.
- db.share_project(room_id, ConnectionId { epoch, id: 0 }, &[])
+ db.share_project(room_id, ConnectionId { owner_id, id: 0 }, &[])
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2);
- db.leave_room(ConnectionId { epoch, id: 1 }).await.unwrap();
+ db.leave_room(ConnectionId { owner_id, id: 1 })
+ .await
+ .unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0);
}
);
@@ -138,7 +138,7 @@ impl Deref for DbHandle {
}
pub struct Server {
- epoch: parking_lot::Mutex<ServerId>,
+ id: parking_lot::Mutex<ServerId>,
peer: Arc<Peer>,
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
app_state: Arc<AppState>,
@@ -169,10 +169,10 @@ where
}
impl Server {
- pub fn new(epoch: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
+ pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
let mut server = Self {
- epoch: parking_lot::Mutex::new(epoch),
- peer: Peer::new(epoch.0 as u32),
+ id: parking_lot::Mutex::new(id),
+ peer: Peer::new(id.0 as u32),
app_state,
executor,
connection_pool: Default::default(),
@@ -241,7 +241,7 @@ impl Server {
}
pub async fn start(&self) -> Result<()> {
- let epoch = *self.epoch.lock();
+ let server_id = *self.id.lock();
let app_state = self.app_state.clone();
let peer = self.peer.clone();
let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
@@ -254,7 +254,7 @@ impl Server {
tracing::info!("begin deleting stale projects");
app_state
.db
- .delete_stale_projects(epoch, &app_state.config.zed_environment)
+ .delete_stale_projects(&app_state.config.zed_environment, server_id)
.await?;
tracing::info!("finish deleting stale projects");
@@ -266,7 +266,7 @@ impl Server {
tracing::info!("cleanup timeout expired, retrieving stale rooms");
if let Some(room_ids) = app_state
.db
- .stale_room_ids(epoch, &app_state.config.zed_environment)
+ .stale_room_ids(&app_state.config.zed_environment, server_id)
.await
.trace_err()
{
@@ -278,7 +278,7 @@ impl Server {
let mut delete_live_kit_room = false;
if let Ok(mut refreshed_room) =
- app_state.db.refresh_room(room_id, epoch).await
+ app_state.db.refresh_room(room_id, server_id).await
{
tracing::info!(
room_id = room_id.0,
@@ -354,7 +354,7 @@ impl Server {
app_state
.db
- .delete_stale_servers(epoch, &app_state.config.zed_environment)
+ .delete_stale_servers(server_id, &app_state.config.zed_environment)
.await
.trace_err();
}
@@ -370,10 +370,15 @@ impl Server {
}
#[cfg(test)]
- pub fn reset(&self, epoch: ServerId) {
+ pub fn reset(&self, id: ServerId) {
self.teardown();
- *self.epoch.lock() = epoch;
- self.peer.reset(epoch.0 as u32);
+ *self.id.lock() = id;
+ self.peer.reset(id.0 as u32);
+ }
+
+ #[cfg(test)]
+ pub fn id(&self) -> ServerId {
+ *self.id.lock()
}
fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
@@ -1156,7 +1161,7 @@ async fn join_project(
.iter()
.map(|collaborator| {
let peer_id = proto::PeerId {
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
proto::Collaborator {
@@ -1412,7 +1417,7 @@ where
.find(|collaborator| collaborator.is_host)
.ok_or_else(|| anyhow!("host not found"))?;
ConnectionId {
- epoch: host.connection_server_id.0 as u32,
+ owner_id: host.connection_server_id.0 as u32,
id: host.connection_id as u32,
}
};
@@ -1443,7 +1448,7 @@ async fn save_buffer(
.find(|collaborator| collaborator.is_host)
.ok_or_else(|| anyhow!("host not found"))?;
ConnectionId {
- epoch: host.connection_server_id.0 as u32,
+ owner_id: host.connection_server_id.0 as u32,
id: host.connection_id as u32,
}
};
@@ -1459,13 +1464,13 @@ async fn save_buffer(
.await?;
collaborators.retain(|collaborator| {
let collaborator_connection = ConnectionId {
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
collaborator_connection != session.connection_id
});
let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId {
- epoch: collaborator.connection_server_id.0 as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
});
broadcast(host_connection_id, project_connection_ids, |conn_id| {