1use super::*;
2
3impl Database {
4 /// Creates a new server in the given environment.
5 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
6 self.transaction(|tx| async move {
7 let server = server::ActiveModel {
8 environment: ActiveValue::set(environment.into()),
9 ..Default::default()
10 }
11 .insert(&*tx)
12 .await?;
13 Ok(server.id)
14 })
15 .await
16 }
17
18 /// Returns the IDs of resources associated with stale servers.
19 ///
20 /// A server is stale if it is in the specified `environment` and does not
21 /// match the provided `new_server_id`.
22 pub async fn stale_server_resource_ids(
23 &self,
24 environment: &str,
25 new_server_id: ServerId,
26 ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
27 self.transaction(|tx| async move {
28 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
29 enum QueryRoomIds {
30 RoomId,
31 }
32
33 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
34 enum QueryChannelIds {
35 ChannelId,
36 }
37
38 let stale_server_epochs = self
39 .stale_server_ids(environment, new_server_id, &tx)
40 .await?;
41 let room_ids = room_participant::Entity::find()
42 .select_only()
43 .column(room_participant::Column::RoomId)
44 .distinct()
45 .filter(
46 room_participant::Column::AnsweringConnectionServerId
47 .is_in(stale_server_epochs.iter().copied()),
48 )
49 .into_values::<_, QueryRoomIds>()
50 .all(&*tx)
51 .await?;
52 let channel_ids = channel_buffer_collaborator::Entity::find()
53 .select_only()
54 .column(channel_buffer_collaborator::Column::ChannelId)
55 .distinct()
56 .filter(
57 channel_buffer_collaborator::Column::ConnectionServerId
58 .is_in(stale_server_epochs.iter().copied()),
59 )
60 .into_values::<_, QueryChannelIds>()
61 .all(&*tx)
62 .await?;
63
64 Ok((room_ids, channel_ids))
65 })
66 .await
67 }
68
69 /// Deletes any stale servers in the environment that don't match the `new_server_id`.
70 pub async fn delete_stale_servers(
71 &self,
72 environment: &str,
73 new_server_id: ServerId,
74 ) -> Result<()> {
75 self.transaction(|tx| async move {
76 server::Entity::delete_many()
77 .filter(
78 Condition::all()
79 .add(server::Column::Environment.eq(environment))
80 .add(server::Column::Id.ne(new_server_id)),
81 )
82 .exec(&*tx)
83 .await?;
84 Ok(())
85 })
86 .await
87 }
88
89 async fn stale_server_ids(
90 &self,
91 environment: &str,
92 new_server_id: ServerId,
93 tx: &DatabaseTransaction,
94 ) -> Result<Vec<ServerId>> {
95 let stale_servers = server::Entity::find()
96 .filter(
97 Condition::all()
98 .add(server::Column::Environment.eq(environment))
99 .add(server::Column::Id.ne(new_server_id)),
100 )
101 .all(tx)
102 .await?;
103 Ok(stale_servers.into_iter().map(|server| server.id).collect())
104 }
105}