1use super::*;
2
3impl Database {
4 /// Creates a new server in the given environment.
5 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
6 self.transaction(|tx| async move {
7 let server = server::ActiveModel {
8 environment: ActiveValue::set(environment.into()),
9 ..Default::default()
10 }
11 .insert(&*tx)
12 .await?;
13 Ok(server.id)
14 })
15 .await
16 }
17
18 /// Returns the IDs of resources associated with stale servers.
19 ///
20 /// A server is stale if it is in the specified `environment` and does not
21 /// match the provided `new_server_id`.
22 pub async fn stale_server_resource_ids(
23 &self,
24 environment: &str,
25 new_server_id: ServerId,
26 ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
27 self.transaction(|tx| async move {
28 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
29 enum QueryRoomIds {
30 RoomId,
31 }
32
33 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
34 enum QueryChannelIds {
35 ChannelId,
36 }
37
38 let stale_server_epochs = self
39 .stale_server_ids(environment, new_server_id, &tx)
40 .await?;
41 let room_ids = room_participant::Entity::find()
42 .select_only()
43 .column(room_participant::Column::RoomId)
44 .distinct()
45 .filter(
46 room_participant::Column::AnsweringConnectionServerId
47 .is_in(stale_server_epochs.iter().copied()),
48 )
49 .into_values::<_, QueryRoomIds>()
50 .all(&*tx)
51 .await?;
52 let channel_ids = channel_buffer_collaborator::Entity::find()
53 .select_only()
54 .column(channel_buffer_collaborator::Column::ChannelId)
55 .distinct()
56 .filter(
57 channel_buffer_collaborator::Column::ConnectionServerId
58 .is_in(stale_server_epochs.iter().copied()),
59 )
60 .into_values::<_, QueryChannelIds>()
61 .all(&*tx)
62 .await?;
63
64 Ok((room_ids, channel_ids))
65 })
66 .await
67 }
68
69 /// Delete all channel chat participants from previous servers
70 pub async fn delete_stale_channel_chat_participants(
71 &self,
72 environment: &str,
73 new_server_id: ServerId,
74 ) -> Result<()> {
75 self.transaction(|tx| async move {
76 let stale_server_epochs = self
77 .stale_server_ids(environment, new_server_id, &tx)
78 .await?;
79
80 channel_chat_participant::Entity::delete_many()
81 .filter(
82 channel_chat_participant::Column::ConnectionServerId
83 .is_in(stale_server_epochs.iter().copied()),
84 )
85 .exec(&*tx)
86 .await?;
87
88 Ok(())
89 })
90 .await
91 }
92
93 pub async fn clear_old_worktree_entries(&self, server_id: ServerId) -> Result<()> {
94 self.transaction(|tx| async move {
95 use sea_orm::Statement;
96 use sea_orm::sea_query::{Expr, Query};
97
98 loop {
99 let delete_query = Query::delete()
100 .from_table(worktree_entry::Entity)
101 .and_where(
102 Expr::tuple([
103 Expr::col((worktree_entry::Entity, worktree_entry::Column::ProjectId))
104 .into(),
105 Expr::col((worktree_entry::Entity, worktree_entry::Column::WorktreeId))
106 .into(),
107 Expr::col((worktree_entry::Entity, worktree_entry::Column::Id)).into(),
108 ])
109 .in_subquery(
110 Query::select()
111 .columns([
112 (worktree_entry::Entity, worktree_entry::Column::ProjectId),
113 (worktree_entry::Entity, worktree_entry::Column::WorktreeId),
114 (worktree_entry::Entity, worktree_entry::Column::Id),
115 ])
116 .from(worktree_entry::Entity)
117 .inner_join(
118 project::Entity,
119 Expr::col((project::Entity, project::Column::Id)).equals((
120 worktree_entry::Entity,
121 worktree_entry::Column::ProjectId,
122 )),
123 )
124 .and_where(project::Column::HostConnectionServerId.ne(server_id))
125 .limit(10000)
126 .to_owned(),
127 ),
128 )
129 .to_owned();
130
131 let statement = Statement::from_sql_and_values(
132 tx.get_database_backend(),
133 delete_query
134 .to_string(sea_orm::sea_query::PostgresQueryBuilder)
135 .as_str(),
136 vec![],
137 );
138
139 let result = tx.execute(statement).await?;
140 if result.rows_affected() == 0 {
141 break;
142 }
143 }
144
145 loop {
146 let delete_query = Query::delete()
147 .from_table(project_repository_statuses::Entity)
148 .and_where(
149 Expr::tuple([Expr::col((
150 project_repository_statuses::Entity,
151 project_repository_statuses::Column::ProjectId,
152 ))
153 .into()])
154 .in_subquery(
155 Query::select()
156 .columns([(
157 project_repository_statuses::Entity,
158 project_repository_statuses::Column::ProjectId,
159 )])
160 .from(project_repository_statuses::Entity)
161 .inner_join(
162 project::Entity,
163 Expr::col((project::Entity, project::Column::Id)).equals((
164 project_repository_statuses::Entity,
165 project_repository_statuses::Column::ProjectId,
166 )),
167 )
168 .and_where(project::Column::HostConnectionServerId.ne(server_id))
169 .limit(10000)
170 .to_owned(),
171 ),
172 )
173 .to_owned();
174
175 let statement = Statement::from_sql_and_values(
176 tx.get_database_backend(),
177 delete_query
178 .to_string(sea_orm::sea_query::PostgresQueryBuilder)
179 .as_str(),
180 vec![],
181 );
182
183 let result = tx.execute(statement).await?;
184 if result.rows_affected() == 0 {
185 break;
186 }
187 }
188
189 Ok(())
190 })
191 .await
192 }
193
194 /// Deletes any stale servers in the environment that don't match the `new_server_id`.
195 pub async fn delete_stale_servers(
196 &self,
197 environment: &str,
198 new_server_id: ServerId,
199 ) -> Result<()> {
200 self.transaction(|tx| async move {
201 server::Entity::delete_many()
202 .filter(
203 Condition::all()
204 .add(server::Column::Environment.eq(environment))
205 .add(server::Column::Id.ne(new_server_id)),
206 )
207 .exec(&*tx)
208 .await?;
209 Ok(())
210 })
211 .await
212 }
213
214 pub async fn stale_server_ids(
215 &self,
216 environment: &str,
217 new_server_id: ServerId,
218 tx: &DatabaseTransaction,
219 ) -> Result<Vec<ServerId>> {
220 let stale_servers = server::Entity::find()
221 .filter(
222 Condition::all()
223 .add(server::Column::Environment.eq(environment))
224 .add(server::Column::Id.ne(new_server_id)),
225 )
226 .all(tx)
227 .await?;
228 Ok(stale_servers.into_iter().map(|server| server.id).collect())
229 }
230}