servers.rs

  1use super::*;
  2
  3impl Database {
  4    /// Creates a new server in the given environment.
  5    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
  6        self.transaction(|tx| async move {
  7            let server = server::ActiveModel {
  8                environment: ActiveValue::set(environment.into()),
  9                ..Default::default()
 10            }
 11            .insert(&*tx)
 12            .await?;
 13            Ok(server.id)
 14        })
 15        .await
 16    }
 17
 18    /// Returns the IDs of resources associated with stale servers.
 19    ///
 20    /// A server is stale if it is in the specified `environment` and does not
 21    /// match the provided `new_server_id`.
 22    pub async fn stale_server_resource_ids(
 23        &self,
 24        environment: &str,
 25        new_server_id: ServerId,
 26    ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
 27        self.transaction(|tx| async move {
 28            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 29            enum QueryRoomIds {
 30                RoomId,
 31            }
 32
 33            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 34            enum QueryChannelIds {
 35                ChannelId,
 36            }
 37
 38            let stale_server_epochs = self
 39                .stale_server_ids(environment, new_server_id, &tx)
 40                .await?;
 41            let room_ids = room_participant::Entity::find()
 42                .select_only()
 43                .column(room_participant::Column::RoomId)
 44                .distinct()
 45                .filter(
 46                    room_participant::Column::AnsweringConnectionServerId
 47                        .is_in(stale_server_epochs.iter().copied()),
 48                )
 49                .into_values::<_, QueryRoomIds>()
 50                .all(&*tx)
 51                .await?;
 52            let channel_ids = channel_buffer_collaborator::Entity::find()
 53                .select_only()
 54                .column(channel_buffer_collaborator::Column::ChannelId)
 55                .distinct()
 56                .filter(
 57                    channel_buffer_collaborator::Column::ConnectionServerId
 58                        .is_in(stale_server_epochs.iter().copied()),
 59                )
 60                .into_values::<_, QueryChannelIds>()
 61                .all(&*tx)
 62                .await?;
 63
 64            Ok((room_ids, channel_ids))
 65        })
 66        .await
 67    }
 68
 69    /// Delete all channel chat participants from previous servers
 70    pub async fn delete_stale_channel_chat_participants(
 71        &self,
 72        environment: &str,
 73        new_server_id: ServerId,
 74    ) -> Result<()> {
 75        self.transaction(|tx| async move {
 76            let stale_server_epochs = self
 77                .stale_server_ids(environment, new_server_id, &tx)
 78                .await?;
 79
 80            channel_chat_participant::Entity::delete_many()
 81                .filter(
 82                    channel_chat_participant::Column::ConnectionServerId
 83                        .is_in(stale_server_epochs.iter().copied()),
 84                )
 85                .exec(&*tx)
 86                .await?;
 87
 88            Ok(())
 89        })
 90        .await
 91    }
 92
 93    pub async fn clear_old_worktree_entries(&self, server_id: ServerId) -> Result<()> {
 94        self.transaction(|tx| async move {
 95            use sea_orm::Statement;
 96            use sea_orm::sea_query::{Expr, Query};
 97
 98            loop {
 99                let delete_query = Query::delete()
100                    .from_table(worktree_entry::Entity)
101                    .and_where(
102                        Expr::tuple([
103                            Expr::col((worktree_entry::Entity, worktree_entry::Column::ProjectId))
104                                .into(),
105                            Expr::col((worktree_entry::Entity, worktree_entry::Column::WorktreeId))
106                                .into(),
107                            Expr::col((worktree_entry::Entity, worktree_entry::Column::Id)).into(),
108                        ])
109                        .in_subquery(
110                            Query::select()
111                                .columns([
112                                    (worktree_entry::Entity, worktree_entry::Column::ProjectId),
113                                    (worktree_entry::Entity, worktree_entry::Column::WorktreeId),
114                                    (worktree_entry::Entity, worktree_entry::Column::Id),
115                                ])
116                                .from(worktree_entry::Entity)
117                                .inner_join(
118                                    project::Entity,
119                                    Expr::col((project::Entity, project::Column::Id)).equals((
120                                        worktree_entry::Entity,
121                                        worktree_entry::Column::ProjectId,
122                                    )),
123                                )
124                                .and_where(project::Column::HostConnectionServerId.ne(server_id))
125                                .limit(10000)
126                                .to_owned(),
127                        ),
128                    )
129                    .to_owned();
130
131                let statement = Statement::from_sql_and_values(
132                    tx.get_database_backend(),
133                    delete_query
134                        .to_string(sea_orm::sea_query::PostgresQueryBuilder)
135                        .as_str(),
136                    vec![],
137                );
138
139                let result = tx.execute(statement).await?;
140                if result.rows_affected() == 0 {
141                    break;
142                }
143            }
144
145            Ok(())
146        })
147        .await
148    }
149
150    /// Deletes any stale servers in the environment that don't match the `new_server_id`.
151    pub async fn delete_stale_servers(
152        &self,
153        environment: &str,
154        new_server_id: ServerId,
155    ) -> Result<()> {
156        self.transaction(|tx| async move {
157            server::Entity::delete_many()
158                .filter(
159                    Condition::all()
160                        .add(server::Column::Environment.eq(environment))
161                        .add(server::Column::Id.ne(new_server_id)),
162                )
163                .exec(&*tx)
164                .await?;
165            Ok(())
166        })
167        .await
168    }
169
170    pub async fn stale_server_ids(
171        &self,
172        environment: &str,
173        new_server_id: ServerId,
174        tx: &DatabaseTransaction,
175    ) -> Result<Vec<ServerId>> {
176        let stale_servers = server::Entity::find()
177            .filter(
178                Condition::all()
179                    .add(server::Column::Environment.eq(environment))
180                    .add(server::Column::Id.ne(new_server_id)),
181            )
182            .all(tx)
183            .await?;
184        Ok(stale_servers.into_iter().map(|server| server.id).collect())
185    }
186}