Clear stale data on startup

Antonio Scandurra created

This is a stopgap measure until we introduce reconnection support.

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql   | 12 
crates/collab/migrations/20221111092550_reconnection_support.sql | 10 
crates/collab/src/bin/seed.rs                                    |  2 
crates/collab/src/db.rs                                          | 33 ++
crates/collab/src/db/project.rs                                  |  1 
crates/collab/src/db/project_collaborator.rs                     |  1 
crates/collab/src/db/room_participant.rs                         |  2 
crates/collab/src/main.rs                                        |  2 
8 files changed, 59 insertions(+), 4 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -43,8 +43,10 @@ CREATE TABLE "projects" (
     "id" INTEGER PRIMARY KEY,
     "room_id" INTEGER REFERENCES rooms (id) NOT NULL,
     "host_user_id" INTEGER REFERENCES users (id) NOT NULL,
-    "host_connection_id" INTEGER NOT NULL
+    "host_connection_id" INTEGER NOT NULL,
+    "host_connection_epoch" TEXT NOT NULL
 );
+CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch");
 
 CREATE TABLE "worktrees" (
     "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
@@ -100,22 +102,28 @@ CREATE TABLE "project_collaborators" (
     "id" INTEGER PRIMARY KEY,
     "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "connection_id" INTEGER NOT NULL,
+    "connection_epoch" TEXT NOT NULL,
     "user_id" INTEGER NOT NULL,
     "replica_id" INTEGER NOT NULL,
     "is_host" BOOLEAN NOT NULL
 );
 CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
 CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
+CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch");
 
 CREATE TABLE "room_participants" (
     "id" INTEGER PRIMARY KEY,
     "room_id" INTEGER NOT NULL REFERENCES rooms (id),
     "user_id" INTEGER NOT NULL REFERENCES users (id),
     "answering_connection_id" INTEGER,
+    "answering_connection_epoch" TEXT,
     "location_kind" INTEGER,
     "location_project_id" INTEGER REFERENCES projects (id),
     "initial_project_id" INTEGER REFERENCES projects (id),
     "calling_user_id" INTEGER NOT NULL REFERENCES users (id),
-    "calling_connection_id" INTEGER NOT NULL
+    "calling_connection_id" INTEGER NOT NULL,
+    "calling_connection_epoch" TEXT NOT NULL
 );
 CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id");
+CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch");
+CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch");

crates/collab/migrations/20221111092550_reconnection_support.sql 🔗

@@ -6,7 +6,9 @@ CREATE TABLE IF NOT EXISTS "rooms" (
 ALTER TABLE "projects"
     ADD "room_id" INTEGER REFERENCES rooms (id),
     ADD "host_connection_id" INTEGER,
+    ADD "host_connection_epoch" UUID,
     DROP COLUMN "unregistered";
+CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch");
 
 CREATE TABLE "worktrees" (
     "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
@@ -62,22 +64,28 @@ CREATE TABLE "project_collaborators" (
     "id" SERIAL PRIMARY KEY,
     "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "connection_id" INTEGER NOT NULL,
+    "connection_epoch" UUID NOT NULL,
     "user_id" INTEGER NOT NULL,
     "replica_id" INTEGER NOT NULL,
     "is_host" BOOLEAN NOT NULL
 );
 CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
 CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
+CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch");
 
 CREATE TABLE "room_participants" (
     "id" SERIAL PRIMARY KEY,
     "room_id" INTEGER NOT NULL REFERENCES rooms (id),
     "user_id" INTEGER NOT NULL REFERENCES users (id),
     "answering_connection_id" INTEGER,
+    "answering_connection_epoch" UUID,
     "location_kind" INTEGER,
     "location_project_id" INTEGER REFERENCES projects (id),
     "initial_project_id" INTEGER REFERENCES projects (id),
     "calling_user_id" INTEGER NOT NULL REFERENCES users (id),
-    "calling_connection_id" INTEGER NOT NULL
+    "calling_connection_id" INTEGER NOT NULL,
+    "calling_connection_epoch" UUID NOT NULL
 );
 CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id");
+CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch");
+CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch");

crates/collab/src/bin/seed.rs 🔗

@@ -1,4 +1,4 @@
-use collab::{db, Error, Result};
+use collab::db;
 use db::{ConnectOptions, Database, UserId};
 use serde::{de::DeserializeOwned, Deserialize};
 use std::fmt::Write;

crates/collab/src/db.rs 🔗

@@ -47,6 +47,7 @@ pub struct Database {
     background: Option<std::sync::Arc<gpui::executor::Background>>,
     #[cfg(test)]
     runtime: Option<tokio::runtime::Runtime>,
+    epoch: Uuid,
 }
 
 impl Database {
@@ -59,6 +60,7 @@ impl Database {
             background: None,
             #[cfg(test)]
             runtime: None,
+            epoch: Uuid::new_v4(),
         })
     }
 
@@ -103,6 +105,30 @@ impl Database {
         Ok(new_migrations)
     }
 
+    pub async fn clear_stale_data(&self) -> Result<()> {
+        self.transact(|tx| async {
+            project_collaborator::Entity::delete_many()
+                .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch))
+                .exec(&tx)
+                .await?;
+            room_participant::Entity::delete_many()
+                .filter(
+                    room_participant::Column::AnsweringConnectionEpoch
+                        .ne(self.epoch)
+                        .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)),
+                )
+                .exec(&tx)
+                .await?;
+            project::Entity::delete_many()
+                .filter(project::Column::HostConnectionEpoch.ne(self.epoch))
+                .exec(&tx)
+                .await?;
+            tx.commit().await?;
+            Ok(())
+        })
+        .await
+    }
+
     // users
 
     pub async fn create_user(
@@ -983,8 +1009,10 @@ impl Database {
                 room_id: ActiveValue::set(room_id),
                 user_id: ActiveValue::set(user_id),
                 answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
+                answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
                 calling_user_id: ActiveValue::set(user_id),
                 calling_connection_id: ActiveValue::set(connection_id.0 as i32),
+                calling_connection_epoch: ActiveValue::set(self.epoch),
                 ..Default::default()
             }
             .insert(&tx)
@@ -1010,6 +1038,7 @@ impl Database {
                 user_id: ActiveValue::set(called_user_id),
                 calling_user_id: ActiveValue::set(calling_user_id),
                 calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
+                calling_connection_epoch: ActiveValue::set(self.epoch),
                 initial_project_id: ActiveValue::set(initial_project_id),
                 ..Default::default()
             }
@@ -1127,6 +1156,7 @@ impl Database {
                 )
                 .set(room_participant::ActiveModel {
                     answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
+                    answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
                     ..Default::default()
                 })
                 .exec(&tx)
@@ -1489,6 +1519,7 @@ impl Database {
                 room_id: ActiveValue::set(participant.room_id),
                 host_user_id: ActiveValue::set(participant.user_id),
                 host_connection_id: ActiveValue::set(connection_id.0 as i32),
+                host_connection_epoch: ActiveValue::set(self.epoch),
                 ..Default::default()
             }
             .insert(&tx)
@@ -1513,6 +1544,7 @@ impl Database {
             project_collaborator::ActiveModel {
                 project_id: ActiveValue::set(project.id),
                 connection_id: ActiveValue::set(connection_id.0 as i32),
+                connection_epoch: ActiveValue::set(self.epoch),
                 user_id: ActiveValue::set(participant.user_id),
                 replica_id: ActiveValue::set(ReplicaId(0)),
                 is_host: ActiveValue::set(true),
@@ -1832,6 +1864,7 @@ impl Database {
             let new_collaborator = project_collaborator::ActiveModel {
                 project_id: ActiveValue::set(project_id),
                 connection_id: ActiveValue::set(connection_id.0 as i32),
+                connection_epoch: ActiveValue::set(self.epoch),
                 user_id: ActiveValue::set(participant.user_id),
                 replica_id: ActiveValue::set(replica_id),
                 is_host: ActiveValue::set(false),

crates/collab/src/db/project.rs 🔗

@@ -9,6 +9,7 @@ pub struct Model {
     pub room_id: RoomId,
     pub host_user_id: UserId,
     pub host_connection_id: i32,
+    pub host_connection_epoch: Uuid,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

crates/collab/src/db/project_collaborator.rs 🔗

@@ -8,6 +8,7 @@ pub struct Model {
     pub id: ProjectCollaboratorId,
     pub project_id: ProjectId,
     pub connection_id: i32,
+    pub connection_epoch: Uuid,
     pub user_id: UserId,
     pub replica_id: ReplicaId,
     pub is_host: bool,

crates/collab/src/db/room_participant.rs 🔗

@@ -9,11 +9,13 @@ pub struct Model {
     pub room_id: RoomId,
     pub user_id: UserId,
     pub answering_connection_id: Option<i32>,
+    pub answering_connection_epoch: Option<Uuid>,
     pub location_kind: Option<i32>,
     pub location_project_id: Option<ProjectId>,
     pub initial_project_id: Option<ProjectId>,
     pub calling_user_id: UserId,
     pub calling_connection_id: i32,
+    pub calling_connection_epoch: Uuid,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

crates/collab/src/main.rs 🔗

@@ -52,6 +52,8 @@ async fn main() -> Result<()> {
             init_tracing(&config);
 
             let state = AppState::new(config).await?;
+            state.db.clear_stale_data().await?;
+
             let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
                 .expect("failed to bind TCP listener");