servers.rs

 1use super::*;
 2
 3impl Database {
 4    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 5        self.transaction(|tx| async move {
 6            let server = server::ActiveModel {
 7                environment: ActiveValue::set(environment.into()),
 8                ..Default::default()
 9            }
10            .insert(&*tx)
11            .await?;
12            Ok(server.id)
13        })
14        .await
15    }
16
17    pub async fn stale_server_resource_ids(
18        &self,
19        environment: &str,
20        new_server_id: ServerId,
21    ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
22        self.transaction(|tx| async move {
23            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
24            enum QueryRoomIds {
25                RoomId,
26            }
27
28            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
29            enum QueryChannelIds {
30                ChannelId,
31            }
32
33            let stale_server_epochs = self
34                .stale_server_ids(environment, new_server_id, &tx)
35                .await?;
36            let room_ids = room_participant::Entity::find()
37                .select_only()
38                .column(room_participant::Column::RoomId)
39                .distinct()
40                .filter(
41                    room_participant::Column::AnsweringConnectionServerId
42                        .is_in(stale_server_epochs.iter().copied()),
43                )
44                .into_values::<_, QueryRoomIds>()
45                .all(&*tx)
46                .await?;
47            let channel_ids = channel_buffer_collaborator::Entity::find()
48                .select_only()
49                .column(channel_buffer_collaborator::Column::ChannelId)
50                .distinct()
51                .filter(
52                    channel_buffer_collaborator::Column::ConnectionServerId
53                        .is_in(stale_server_epochs.iter().copied()),
54                )
55                .into_values::<_, QueryChannelIds>()
56                .all(&*tx)
57                .await?;
58
59            Ok((room_ids, channel_ids))
60        })
61        .await
62    }
63
64    pub async fn delete_stale_servers(
65        &self,
66        environment: &str,
67        new_server_id: ServerId,
68    ) -> Result<()> {
69        self.transaction(|tx| async move {
70            server::Entity::delete_many()
71                .filter(
72                    Condition::all()
73                        .add(server::Column::Environment.eq(environment))
74                        .add(server::Column::Id.ne(new_server_id)),
75                )
76                .exec(&*tx)
77                .await?;
78            Ok(())
79        })
80        .await
81    }
82
83    async fn stale_server_ids(
84        &self,
85        environment: &str,
86        new_server_id: ServerId,
87        tx: &DatabaseTransaction,
88    ) -> Result<Vec<ServerId>> {
89        let stale_servers = server::Entity::find()
90            .filter(
91                Condition::all()
92                    .add(server::Column::Environment.eq(environment))
93                    .add(server::Column::Id.ne(new_server_id)),
94            )
95            .all(&*tx)
96            .await?;
97        Ok(stale_servers.into_iter().map(|server| server.id).collect())
98    }
99}