Improve collab cleanup (#32000)

Conrad Irwin created

Co-authored-by: Max <max@zed.dev>
Co-authored-by: Marshall <marshall@zed.dev>
Co-authored-by: Mikayla <mikayla@zed.dev>

Release Notes:

- N/A

Change summary

crates/collab/src/db/queries/servers.rs | 83 ++++++++++++++++++++++++++
crates/collab/src/rpc.rs                | 25 ++++++++
2 files changed, 107 insertions(+), 1 deletion(-)

Detailed changes

crates/collab/src/db/queries/servers.rs 🔗

@@ -66,6 +66,87 @@ impl Database {
         .await
     }
 
+    /// Delete all channel chat participants from previous servers
+    pub async fn delete_stale_channel_chat_participants(
+        &self,
+        environment: &str,
+        new_server_id: ServerId,
+    ) -> Result<()> {
+        self.transaction(|tx| async move {
+            let stale_server_epochs = self
+                .stale_server_ids(environment, new_server_id, &tx)
+                .await?;
+
+            channel_chat_participant::Entity::delete_many()
+                .filter(
+                    channel_chat_participant::Column::ConnectionServerId
+                        .is_in(stale_server_epochs.iter().copied()),
+                )
+                .exec(&*tx)
+                .await?;
+
+            Ok(())
+        })
+        .await
+    }
+
+    pub async fn clear_old_worktree_entries(&self, server_id: ServerId) -> Result<()> {
+        self.transaction(|tx| async move {
+            use sea_orm::Statement;
+            use sea_orm::sea_query::{Expr, Query};
+
+            loop {
+                let delete_query = Query::delete()
+                    .from_table(worktree_entry::Entity)
+                    .and_where(
+                        Expr::tuple([
+                            Expr::col((worktree_entry::Entity, worktree_entry::Column::ProjectId))
+                                .into(),
+                            Expr::col((worktree_entry::Entity, worktree_entry::Column::WorktreeId))
+                                .into(),
+                            Expr::col((worktree_entry::Entity, worktree_entry::Column::Id)).into(),
+                        ])
+                        .in_subquery(
+                            Query::select()
+                                .columns([
+                                    (worktree_entry::Entity, worktree_entry::Column::ProjectId),
+                                    (worktree_entry::Entity, worktree_entry::Column::WorktreeId),
+                                    (worktree_entry::Entity, worktree_entry::Column::Id),
+                                ])
+                                .from(worktree_entry::Entity)
+                                .inner_join(
+                                    project::Entity,
+                                    Expr::col((project::Entity, project::Column::Id)).equals((
+                                        worktree_entry::Entity,
+                                        worktree_entry::Column::ProjectId,
+                                    )),
+                                )
+                                .and_where(project::Column::HostConnectionServerId.ne(server_id))
+                                .limit(10000)
+                                .to_owned(),
+                        ),
+                    )
+                    .to_owned();
+
+                let statement = Statement::from_sql_and_values(
+                    tx.get_database_backend(),
+                    delete_query
+                        .to_string(sea_orm::sea_query::PostgresQueryBuilder)
+                        .as_str(),
+                    vec![],
+                );
+
+                let result = tx.execute(statement).await?;
+                if result.rows_affected() == 0 {
+                    break;
+                }
+            }
+
+            Ok(())
+        })
+        .await
+    }
+
     /// Deletes any stale servers in the environment that don't match the `new_server_id`.
     pub async fn delete_stale_servers(
         &self,
@@ -86,7 +167,7 @@ impl Database {
         .await
     }
 
-    async fn stale_server_ids(
+    pub async fn stale_server_ids(
         &self,
         environment: &str,
         new_server_id: ServerId,

crates/collab/src/rpc.rs 🔗

@@ -433,6 +433,16 @@ impl Server {
                 tracing::info!("waiting for cleanup timeout");
                 timeout.await;
                 tracing::info!("cleanup timeout expired, retrieving stale rooms");
+
+                app_state
+                    .db
+                    .delete_stale_channel_chat_participants(
+                        &app_state.config.zed_environment,
+                        server_id,
+                    )
+                    .await
+                    .trace_err();
+
                 if let Some((room_ids, channel_ids)) = app_state
                     .db
                     .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
@@ -554,6 +564,21 @@ impl Server {
                     }
                 }
 
+                app_state
+                    .db
+                    .delete_stale_channel_chat_participants(
+                        &app_state.config.zed_environment,
+                        server_id,
+                    )
+                    .await
+                    .trace_err();
+
+                app_state
+                    .db
+                    .clear_old_worktree_entries(server_id)
+                    .await
+                    .trace_err();
+
                 app_state
                     .db
                     .delete_stale_servers(&app_state.config.zed_environment, server_id)