1use super::*;
2
3impl Database {
4 /// Creates a new server in the given environment.
5 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
6 self.transaction(|tx| async move {
7 let server = server::ActiveModel {
8 environment: ActiveValue::set(environment.into()),
9 ..Default::default()
10 }
11 .insert(&*tx)
12 .await?;
13 Ok(server.id)
14 })
15 .await
16 }
17
18 /// Returns the IDs of resources associated with stale servers.
19 ///
20 /// A server is stale if it is in the specified `environment` and does not
21 /// match the provided `new_server_id`.
22 pub async fn stale_server_resource_ids(
23 &self,
24 environment: &str,
25 new_server_id: ServerId,
26 ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
27 self.transaction(|tx| async move {
28 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
29 enum QueryRoomIds {
30 RoomId,
31 }
32
33 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
34 enum QueryChannelIds {
35 ChannelId,
36 }
37
38 let stale_server_epochs = self
39 .stale_server_ids(environment, new_server_id, &tx)
40 .await?;
41 let room_ids = room_participant::Entity::find()
42 .select_only()
43 .column(room_participant::Column::RoomId)
44 .distinct()
45 .filter(
46 room_participant::Column::AnsweringConnectionServerId
47 .is_in(stale_server_epochs.iter().copied()),
48 )
49 .into_values::<_, QueryRoomIds>()
50 .all(&*tx)
51 .await?;
52 let channel_ids = channel_buffer_collaborator::Entity::find()
53 .select_only()
54 .column(channel_buffer_collaborator::Column::ChannelId)
55 .distinct()
56 .filter(
57 channel_buffer_collaborator::Column::ConnectionServerId
58 .is_in(stale_server_epochs.iter().copied()),
59 )
60 .into_values::<_, QueryChannelIds>()
61 .all(&*tx)
62 .await?;
63
64 Ok((room_ids, channel_ids))
65 })
66 .await
67 }
68
69 /// Delete all channel chat participants from previous servers
70 pub async fn delete_stale_channel_chat_participants(
71 &self,
72 environment: &str,
73 new_server_id: ServerId,
74 ) -> Result<()> {
75 self.transaction(|tx| async move {
76 let stale_server_epochs = self
77 .stale_server_ids(environment, new_server_id, &tx)
78 .await?;
79
80 channel_chat_participant::Entity::delete_many()
81 .filter(
82 channel_chat_participant::Column::ConnectionServerId
83 .is_in(stale_server_epochs.iter().copied()),
84 )
85 .exec(&*tx)
86 .await?;
87
88 Ok(())
89 })
90 .await
91 }
92
93 pub async fn clear_old_worktree_entries(&self, server_id: ServerId) -> Result<()> {
94 self.transaction(|tx| async move {
95 use sea_orm::Statement;
96 use sea_orm::sea_query::{Expr, Query};
97
98 loop {
99 let delete_query = Query::delete()
100 .from_table(worktree_entry::Entity)
101 .and_where(
102 Expr::tuple([
103 Expr::col((worktree_entry::Entity, worktree_entry::Column::ProjectId))
104 .into(),
105 Expr::col((worktree_entry::Entity, worktree_entry::Column::WorktreeId))
106 .into(),
107 Expr::col((worktree_entry::Entity, worktree_entry::Column::Id)).into(),
108 ])
109 .in_subquery(
110 Query::select()
111 .columns([
112 (worktree_entry::Entity, worktree_entry::Column::ProjectId),
113 (worktree_entry::Entity, worktree_entry::Column::WorktreeId),
114 (worktree_entry::Entity, worktree_entry::Column::Id),
115 ])
116 .from(worktree_entry::Entity)
117 .inner_join(
118 project::Entity,
119 Expr::col((project::Entity, project::Column::Id)).equals((
120 worktree_entry::Entity,
121 worktree_entry::Column::ProjectId,
122 )),
123 )
124 .and_where(project::Column::HostConnectionServerId.ne(server_id))
125 .limit(10000)
126 .to_owned(),
127 ),
128 )
129 .to_owned();
130
131 let statement = Statement::from_sql_and_values(
132 tx.get_database_backend(),
133 delete_query
134 .to_string(sea_orm::sea_query::PostgresQueryBuilder)
135 .as_str(),
136 vec![],
137 );
138
139 let result = tx.execute(statement).await?;
140 if result.rows_affected() == 0 {
141 break;
142 }
143 }
144
145 Ok(())
146 })
147 .await
148 }
149
150 /// Deletes any stale servers in the environment that don't match the `new_server_id`.
151 pub async fn delete_stale_servers(
152 &self,
153 environment: &str,
154 new_server_id: ServerId,
155 ) -> Result<()> {
156 self.transaction(|tx| async move {
157 server::Entity::delete_many()
158 .filter(
159 Condition::all()
160 .add(server::Column::Environment.eq(environment))
161 .add(server::Column::Id.ne(new_server_id)),
162 )
163 .exec(&*tx)
164 .await?;
165 Ok(())
166 })
167 .await
168 }
169
170 pub async fn stale_server_ids(
171 &self,
172 environment: &str,
173 new_server_id: ServerId,
174 tx: &DatabaseTransaction,
175 ) -> Result<Vec<ServerId>> {
176 let stale_servers = server::Entity::find()
177 .filter(
178 Condition::all()
179 .add(server::Column::Environment.eq(environment))
180 .add(server::Column::Id.ne(new_server_id)),
181 )
182 .all(tx)
183 .await?;
184 Ok(stale_servers.into_iter().map(|server| server.id).collect())
185 }
186}