servers.rs

  1use super::*;
  2
  3impl Database {
  4    /// Creates a new server in the given environment.
  5    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
  6        self.transaction(|tx| async move {
  7            let server = server::ActiveModel {
  8                environment: ActiveValue::set(environment.into()),
  9                ..Default::default()
 10            }
 11            .insert(&*tx)
 12            .await?;
 13            Ok(server.id)
 14        })
 15        .await
 16    }
 17
 18    /// Returns the IDs of resources associated with stale servers.
 19    ///
 20    /// A server is stale if it is in the specified `environment` and does not
 21    /// match the provided `new_server_id`.
 22    pub async fn stale_server_resource_ids(
 23        &self,
 24        environment: &str,
 25        new_server_id: ServerId,
 26    ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
 27        self.transaction(|tx| async move {
 28            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 29            enum QueryRoomIds {
 30                RoomId,
 31            }
 32
 33            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 34            enum QueryChannelIds {
 35                ChannelId,
 36            }
 37
 38            let stale_server_epochs = self
 39                .stale_server_ids(environment, new_server_id, &tx)
 40                .await?;
 41            let room_ids = room_participant::Entity::find()
 42                .select_only()
 43                .column(room_participant::Column::RoomId)
 44                .distinct()
 45                .filter(
 46                    room_participant::Column::AnsweringConnectionServerId
 47                        .is_in(stale_server_epochs.iter().copied()),
 48                )
 49                .into_values::<_, QueryRoomIds>()
 50                .all(&*tx)
 51                .await?;
 52            let channel_ids = channel_buffer_collaborator::Entity::find()
 53                .select_only()
 54                .column(channel_buffer_collaborator::Column::ChannelId)
 55                .distinct()
 56                .filter(
 57                    channel_buffer_collaborator::Column::ConnectionServerId
 58                        .is_in(stale_server_epochs.iter().copied()),
 59                )
 60                .into_values::<_, QueryChannelIds>()
 61                .all(&*tx)
 62                .await?;
 63
 64            Ok((room_ids, channel_ids))
 65        })
 66        .await
 67    }
 68
 69    /// Deletes any stale servers in the environment that don't match the `new_server_id`.
 70    pub async fn delete_stale_servers(
 71        &self,
 72        environment: &str,
 73        new_server_id: ServerId,
 74    ) -> Result<()> {
 75        self.transaction(|tx| async move {
 76            server::Entity::delete_many()
 77                .filter(
 78                    Condition::all()
 79                        .add(server::Column::Environment.eq(environment))
 80                        .add(server::Column::Id.ne(new_server_id)),
 81                )
 82                .exec(&*tx)
 83                .await?;
 84            Ok(())
 85        })
 86        .await
 87    }
 88
 89    async fn stale_server_ids(
 90        &self,
 91        environment: &str,
 92        new_server_id: ServerId,
 93        tx: &DatabaseTransaction,
 94    ) -> Result<Vec<ServerId>> {
 95        let stale_servers = server::Entity::find()
 96            .filter(
 97                Condition::all()
 98                    .add(server::Column::Environment.eq(environment))
 99                    .add(server::Column::Id.ne(new_server_id)),
100            )
101            .all(tx)
102            .await?;
103        Ok(stale_servers.into_iter().map(|server| server.id).collect())
104    }
105}