Merge pull request #2103 from zed-industries/connection-staleness

Antonio Scandurra created

Fix connection staleness issues

Change summary

Cargo.toml                                              |  2 -
Dockerfile                                              |  1 
crates/collab/src/db.rs                                 |  8 +---
crates/collab/src/tests/randomized_integration_tests.rs | 10 ++---
crates/rpc/src/proto.rs                                 | 20 ++++++++--
5 files changed, 22 insertions(+), 19 deletions(-)

Detailed changes

Cargo.toml 🔗

@@ -84,5 +84,3 @@ split-debuginfo = "unpacked"
 
 [profile.release]
 debug = true
-
-

Dockerfile 🔗

@@ -5,6 +5,7 @@ WORKDIR app
 COPY . .
 
 # Compile collab server
+ARG CARGO_PROFILE_RELEASE_PANIC=abort
 RUN --mount=type=cache,target=./script/node_modules \
     --mount=type=cache,target=/usr/local/cargo/registry \
     --mount=type=cache,target=./target \

crates/collab/src/db.rs 🔗

@@ -1586,12 +1586,8 @@ impl Database {
                     .filter(
                         Condition::all()
                             .add(
-                                room_participant::Column::CallingConnectionId
-                                    .eq(connection.id as i32),
-                            )
-                            .add(
-                                room_participant::Column::CallingConnectionServerId
-                                    .eq(connection.owner_id as i32),
+                                room_participant::Column::CallingUserId
+                                    .eq(leaving_participant.user_id),
                             )
                             .add(room_participant::Column::AnsweringConnectionId.is_null()),
                     )

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -166,12 +166,10 @@ async fn test_random_collaboration(
                     let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
                     let pool = server.connection_pool.lock();
                     for contact in contacts {
-                        if let db::Contact::Accepted { user_id, .. } = contact {
-                            if pool.is_user_online(user_id) {
-                                assert_ne!(
-                                    user_id, removed_user_id,
-                                    "removed client is still a contact of another peer"
-                                );
+                        if let db::Contact::Accepted { user_id, busy, .. } = contact {
+                            if user_id == removed_user_id {
+                                assert!(!pool.is_user_online(user_id));
+                                assert!(!busy);
                             }
                         }
                     }

crates/rpc/src/proto.rs 🔗

@@ -9,7 +9,7 @@ use std::fmt;
 use std::{
     cmp,
     fmt::Debug,
-    io, iter, mem,
+    io, iter,
     time::{Duration, SystemTime, UNIX_EPOCH},
 };
 
@@ -489,16 +489,26 @@ pub fn split_worktree_update(
             return None;
         }
 
-        let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
-        let updated_entries = message.updated_entries.drain(..chunk_size).collect();
-        done = message.updated_entries.is_empty();
+        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
+        let updated_entries = message
+            .updated_entries
+            .drain(..updated_entries_chunk_size)
+            .collect();
+
+        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
+        let removed_entries = message
+            .removed_entries
+            .drain(..removed_entries_chunk_size)
+            .collect();
+
+        done = message.updated_entries.is_empty() && message.removed_entries.is_empty();
         Some(UpdateWorktree {
             project_id: message.project_id,
             worktree_id: message.worktree_id,
             root_name: message.root_name.clone(),
             abs_path: message.abs_path.clone(),
             updated_entries,
-            removed_entries: mem::take(&mut message.removed_entries),
+            removed_entries,
             scan_id: message.scan_id,
             is_last_update: done && message.is_last_update,
         })