Start work on refreshing channel buffer collaborators on server restart

Max Brunsfeld created

Change summary

crates/collab/src/db.rs                                 |  5 +
crates/collab/src/db/queries/buffers.rs                 | 28 ++++++++-
crates/collab/src/db/queries/servers.rs                 | 31 ++++++++--
crates/collab/src/rpc.rs                                | 20 ++++++
crates/collab/src/tests/randomized_integration_tests.rs |  4 
5 files changed, 73 insertions(+), 15 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -503,6 +503,11 @@ pub struct RefreshedRoom {
     pub canceled_calls_to_user_ids: Vec<UserId>,
 }
 
+pub struct RefreshedChannelBuffer {
+    pub connection_ids: Vec<ConnectionId>,
+    pub removed_collaborators: Vec<proto::RemoveChannelBufferCollaborator>,
+}
+
 pub struct Project {
     pub collaborators: Vec<ProjectCollaborator>,
     pub worktrees: BTreeMap<u64, Worktree>,

crates/collab/src/db/queries/buffers.rs 🔗

@@ -123,10 +123,11 @@ impl Database {
 
                 // Find the collaborator record for this user's previous lost
                 // connection. Update it with the new connection id.
-                let Some(self_collaborator) = collaborators
-                    .iter_mut()
-                    .find(|c| c.user_id == user_id && c.connection_lost)
-                else {
+                let server_id = ServerId(connection_id.owner_id as i32);
+                let Some(self_collaborator) = collaborators.iter_mut().find(|c| {
+                    c.user_id == user_id
+                        && (c.connection_lost || c.connection_server_id != server_id)
+                }) else {
                     continue;
                 };
                 let old_connection_id = self_collaborator.connection();
@@ -195,6 +196,25 @@ impl Database {
         .await
     }
 
+    pub async fn refresh_channel_buffer(
+        &self,
+        channel_id: ChannelId,
+        server_id: ServerId,
+    ) -> Result<RefreshedChannelBuffer> {
+        self.transaction(|tx| async move {
+            let mut connection_ids = Vec::new();
+            let mut removed_collaborators = Vec::new();
+
+            // TODO
+
+            Ok(RefreshedChannelBuffer {
+                connection_ids,
+                removed_collaborators,
+            })
+        })
+        .await
+    }
+
     pub async fn leave_channel_buffer(
         &self,
         channel_id: ChannelId,

crates/collab/src/db/queries/servers.rs 🔗

@@ -14,31 +14,48 @@ impl Database {
         .await
     }
 
-    pub async fn stale_room_ids(
+    pub async fn stale_server_resource_ids(
         &self,
         environment: &str,
         new_server_id: ServerId,
-    ) -> Result<Vec<RoomId>> {
+    ) -> Result<(Vec<RoomId>, Vec<ChannelId>)> {
         self.transaction(|tx| async move {
             #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
-            enum QueryAs {
+            enum QueryRoomIds {
                 RoomId,
             }
 
+            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
+            enum QueryChannelIds {
+                ChannelId,
+            }
+
             let stale_server_epochs = self
                 .stale_server_ids(environment, new_server_id, &tx)
                 .await?;
-            Ok(room_participant::Entity::find()
+            let room_ids = room_participant::Entity::find()
                 .select_only()
                 .column(room_participant::Column::RoomId)
                 .distinct()
                 .filter(
                     room_participant::Column::AnsweringConnectionServerId
-                        .is_in(stale_server_epochs),
+                        .is_in(stale_server_epochs.iter().copied()),
+                )
+                .into_values::<_, QueryRoomIds>()
+                .all(&*tx)
+                .await?;
+            let channel_ids = channel_buffer_collaborator::Entity::find()
+                .select_only()
+                .column(channel_buffer_collaborator::Column::ChannelId)
+                .distinct()
+                .filter(
+                    channel_buffer_collaborator::Column::ConnectionServerId
+                        .is_in(stale_server_epochs.iter().copied()),
                 )
-                .into_values::<_, QueryAs>()
+                .into_values::<_, QueryChannelIds>()
                 .all(&*tx)
-                .await?)
+                .await?;
+            Ok((room_ids, channel_ids))
         })
         .await
     }

crates/collab/src/rpc.rs 🔗

@@ -278,13 +278,29 @@ impl Server {
                 tracing::info!("waiting for cleanup timeout");
                 timeout.await;
                 tracing::info!("cleanup timeout expired, retrieving stale rooms");
-                if let Some(room_ids) = app_state
+                if let Some((room_ids, channel_ids)) = app_state
                     .db
-                    .stale_room_ids(&app_state.config.zed_environment, server_id)
+                    .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
                     .await
                     .trace_err()
                 {
                     tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
+
+                    for channel_id in channel_ids {
+                        if let Some(refreshed_channel_buffer) = app_state
+                            .db
+                            .refresh_channel_buffer(channel_id, server_id)
+                            .await
+                            .trace_err()
+                        {
+                            for connection_id in refreshed_channel_buffer.connection_ids {
+                                for message in &refreshed_channel_buffer.removed_collaborators {
+                                    peer.send(connection_id, message.clone()).trace_err();
+                                }
+                            }
+                        }
+                    }
+
                     for room_id in room_ids {
                         let mut contacts_to_update = HashSet::default();
                         let mut canceled_calls_to_user_ids = Vec::new();

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -307,10 +307,10 @@ async fn apply_server_operation(
             server.start().await.unwrap();
             deterministic.advance_clock(CLEANUP_TIMEOUT);
             let environment = &server.app_state.config.zed_environment;
-            let stale_room_ids = server
+            let (stale_room_ids, _) = server
                 .app_state
                 .db
-                .stale_room_ids(environment, server.id())
+                .stale_server_resource_ids(environment, server.id())
                 .await
                 .unwrap();
             assert_eq!(stale_room_ids, vec![]);