servers.rs

  1use super::*;
  2
  3impl Database {
  4    /// Creates a new server in the given environment.
  5    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
  6        self.transaction(|tx| async move {
  7            let server = server::ActiveModel {
  8                environment: ActiveValue::set(environment.into()),
  9                ..Default::default()
 10            }
 11            .insert(&*tx)
 12            .await?;
 13            Ok(server.id)
 14        })
 15        .await
 16    }
 17
 18    /// Returns the IDs of resources associated with stale servers.
 19    ///
 20    /// A server is stale if it is in the specified `environment` and does not
 21    /// match the provided `new_server_id`.
 22    pub async fn stale_server_resource_ids(
 23        &self,
 24        environment: &str,
 25        new_server_id: ServerId,
 26    ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
 27        self.transaction(|tx| async move {
 28            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 29            enum QueryRoomIds {
 30                RoomId,
 31            }
 32
 33            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 34            enum QueryChannelIds {
 35                ChannelId,
 36            }
 37
 38            let stale_server_epochs = self
 39                .stale_server_ids(environment, new_server_id, &tx)
 40                .await?;
 41            let room_ids = room_participant::Entity::find()
 42                .select_only()
 43                .column(room_participant::Column::RoomId)
 44                .distinct()
 45                .filter(
 46                    room_participant::Column::AnsweringConnectionServerId
 47                        .is_in(stale_server_epochs.iter().copied()),
 48                )
 49                .into_values::<_, QueryRoomIds>()
 50                .all(&*tx)
 51                .await?;
 52            let channel_ids = channel_buffer_collaborator::Entity::find()
 53                .select_only()
 54                .column(channel_buffer_collaborator::Column::ChannelId)
 55                .distinct()
 56                .filter(
 57                    channel_buffer_collaborator::Column::ConnectionServerId
 58                        .is_in(stale_server_epochs.iter().copied()),
 59                )
 60                .into_values::<_, QueryChannelIds>()
 61                .all(&*tx)
 62                .await?;
 63
 64            Ok((room_ids, channel_ids))
 65        })
 66        .await
 67    }
 68
 69    /// Delete all channel chat participants from previous servers
 70    pub async fn delete_stale_channel_chat_participants(
 71        &self,
 72        environment: &str,
 73        new_server_id: ServerId,
 74    ) -> Result<()> {
 75        self.transaction(|tx| async move {
 76            let stale_server_epochs = self
 77                .stale_server_ids(environment, new_server_id, &tx)
 78                .await?;
 79
 80            channel_chat_participant::Entity::delete_many()
 81                .filter(
 82                    channel_chat_participant::Column::ConnectionServerId
 83                        .is_in(stale_server_epochs.iter().copied()),
 84                )
 85                .exec(&*tx)
 86                .await?;
 87
 88            Ok(())
 89        })
 90        .await
 91    }
 92
 93    pub async fn clear_old_worktree_entries(&self, server_id: ServerId) -> Result<()> {
 94        self.transaction(|tx| async move {
 95            use sea_orm::Statement;
 96            use sea_orm::sea_query::{Expr, Query};
 97
 98            loop {
 99                let delete_query = Query::delete()
100                    .from_table(worktree_entry::Entity)
101                    .and_where(
102                        Expr::tuple([
103                            Expr::col((worktree_entry::Entity, worktree_entry::Column::ProjectId))
104                                .into(),
105                            Expr::col((worktree_entry::Entity, worktree_entry::Column::WorktreeId))
106                                .into(),
107                            Expr::col((worktree_entry::Entity, worktree_entry::Column::Id)).into(),
108                        ])
109                        .in_subquery(
110                            Query::select()
111                                .columns([
112                                    (worktree_entry::Entity, worktree_entry::Column::ProjectId),
113                                    (worktree_entry::Entity, worktree_entry::Column::WorktreeId),
114                                    (worktree_entry::Entity, worktree_entry::Column::Id),
115                                ])
116                                .from(worktree_entry::Entity)
117                                .inner_join(
118                                    project::Entity,
119                                    Expr::col((project::Entity, project::Column::Id)).equals((
120                                        worktree_entry::Entity,
121                                        worktree_entry::Column::ProjectId,
122                                    )),
123                                )
124                                .and_where(project::Column::HostConnectionServerId.ne(server_id))
125                                .limit(10000)
126                                .to_owned(),
127                        ),
128                    )
129                    .to_owned();
130
131                let statement = Statement::from_sql_and_values(
132                    tx.get_database_backend(),
133                    delete_query
134                        .to_string(sea_orm::sea_query::PostgresQueryBuilder)
135                        .as_str(),
136                    vec![],
137                );
138
139                let result = tx.execute(statement).await?;
140                if result.rows_affected() == 0 {
141                    break;
142                }
143            }
144
145            loop {
146                let delete_query = Query::delete()
147                    .from_table(project_repository_statuses::Entity)
148                    .and_where(
149                        Expr::tuple([Expr::col((
150                            project_repository_statuses::Entity,
151                            project_repository_statuses::Column::ProjectId,
152                        ))
153                        .into()])
154                        .in_subquery(
155                            Query::select()
156                                .columns([(
157                                    project_repository_statuses::Entity,
158                                    project_repository_statuses::Column::ProjectId,
159                                )])
160                                .from(project_repository_statuses::Entity)
161                                .inner_join(
162                                    project::Entity,
163                                    Expr::col((project::Entity, project::Column::Id)).equals((
164                                        project_repository_statuses::Entity,
165                                        project_repository_statuses::Column::ProjectId,
166                                    )),
167                                )
168                                .and_where(project::Column::HostConnectionServerId.ne(server_id))
169                                .limit(10000)
170                                .to_owned(),
171                        ),
172                    )
173                    .to_owned();
174
175                let statement = Statement::from_sql_and_values(
176                    tx.get_database_backend(),
177                    delete_query
178                        .to_string(sea_orm::sea_query::PostgresQueryBuilder)
179                        .as_str(),
180                    vec![],
181                );
182
183                let result = tx.execute(statement).await?;
184                if result.rows_affected() == 0 {
185                    break;
186                }
187            }
188
189            Ok(())
190        })
191        .await
192    }
193
194    /// Deletes any stale servers in the environment that don't match the `new_server_id`.
195    pub async fn delete_stale_servers(
196        &self,
197        environment: &str,
198        new_server_id: ServerId,
199    ) -> Result<()> {
200        self.transaction(|tx| async move {
201            server::Entity::delete_many()
202                .filter(
203                    Condition::all()
204                        .add(server::Column::Environment.eq(environment))
205                        .add(server::Column::Id.ne(new_server_id)),
206                )
207                .exec(&*tx)
208                .await?;
209            Ok(())
210        })
211        .await
212    }
213
214    pub async fn stale_server_ids(
215        &self,
216        environment: &str,
217        new_server_id: ServerId,
218        tx: &DatabaseTransaction,
219    ) -> Result<Vec<ServerId>> {
220        let stale_servers = server::Entity::find()
221            .filter(
222                Condition::all()
223                    .add(server::Column::Environment.eq(environment))
224                    .add(server::Column::Id.ne(new_server_id)),
225            )
226            .all(tx)
227            .await?;
228        Ok(stale_servers.into_iter().map(|server| server.id).collect())
229    }
230}