servers.rs

 1use super::*;
 2
 3impl Database {
 4    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 5        self.transaction(|tx| async move {
 6            let server = server::ActiveModel {
 7                environment: ActiveValue::set(environment.into()),
 8                ..Default::default()
 9            }
10            .insert(&*tx)
11            .await?;
12            Ok(server.id)
13        })
14        .await
15    }
16
17    pub async fn stale_server_resource_ids(
18        &self,
19        environment: &str,
20        new_server_id: ServerId,
21    ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
22        self.transaction(|tx| async move {
23            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
24            enum QueryRoomIds {
25                RoomId,
26            }
27
28            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
29            enum QueryChannelIds {
30                ChannelId,
31            }
32
33            let stale_server_epochs = self
34                .stale_server_ids(environment, new_server_id, &tx)
35                .await?;
36            let room_ids = room_participant::Entity::find()
37                .select_only()
38                .column(room_participant::Column::RoomId)
39                .distinct()
40                .filter(
41                    room_participant::Column::AnsweringConnectionServerId
42                        .is_in(stale_server_epochs.iter().copied()),
43                )
44                .into_values::<_, QueryRoomIds>()
45                .all(&*tx)
46                .await?;
47            let channel_ids = channel_buffer_collaborator::Entity::find()
48                .select_only()
49                .column(channel_buffer_collaborator::Column::ChannelId)
50                .distinct()
51                .filter(
52                    channel_buffer_collaborator::Column::ConnectionServerId
53                        .is_in(stale_server_epochs.iter().copied()),
54                )
55                .into_values::<_, QueryChannelIds>()
56                .all(&*tx)
57                .await?;
58            Ok((room_ids, channel_ids))
59        })
60        .await
61    }
62
63    pub async fn delete_stale_servers(
64        &self,
65        environment: &str,
66        new_server_id: ServerId,
67    ) -> Result<()> {
68        self.transaction(|tx| async move {
69            server::Entity::delete_many()
70                .filter(
71                    Condition::all()
72                        .add(server::Column::Environment.eq(environment))
73                        .add(server::Column::Id.ne(new_server_id)),
74                )
75                .exec(&*tx)
76                .await?;
77            Ok(())
78        })
79        .await
80    }
81
82    async fn stale_server_ids(
83        &self,
84        environment: &str,
85        new_server_id: ServerId,
86        tx: &DatabaseTransaction,
87    ) -> Result<Vec<ServerId>> {
88        let stale_servers = server::Entity::find()
89            .filter(
90                Condition::all()
91                    .add(server::Column::Environment.eq(environment))
92                    .add(server::Column::Id.ne(new_server_id)),
93            )
94            .all(&*tx)
95            .await?;
96        Ok(stale_servers.into_iter().map(|server| server.id).collect())
97    }
98}