Fix project reconnection test to ensure rooms actually reconnects

Antonio Scandurra created

Change summary

crates/call/src/room.rs                | 15 ++++-
crates/collab/src/db.rs                | 67 +-----------------------
crates/collab/src/integration_tests.rs | 76 ++++++++++++++++++++++++---
crates/collab/src/rpc.rs               |  9 --
crates/project/src/project.rs          | 25 +++-----
5 files changed, 94 insertions(+), 98 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -355,6 +355,7 @@ impl Room {
             if let Some(handle) = project.upgrade(cx) {
                 let project = handle.read(cx);
                 if let Some(project_id) = project.remote_id() {
+                    projects.insert(project_id, handle.clone());
                     rejoined_projects.push(proto::RejoinProject {
                         project_id,
                         worktrees: project
@@ -387,10 +388,18 @@ impl Room {
                 this.status = RoomStatus::Online;
                 this.apply_room_update(room_proto, cx)?;
 
-                for shared_project in response.reshared_projects {
-                    if let Some(project) = projects.get(&shared_project.id) {
+                for reshared_project in response.reshared_projects {
+                    if let Some(project) = projects.get(&reshared_project.id) {
+                        project.update(cx, |project, cx| {
+                            project.reshared(reshared_project, cx).log_err();
+                        });
+                    }
+                }
+
+                for rejoined_project in response.rejoined_projects {
+                    if let Some(project) = projects.get(&rejoined_project.id) {
                         project.update(cx, |project, cx| {
-                            project.reshared(shared_project, cx).log_err();
+                            project.rejoined(rejoined_project, cx).log_err();
                         });
                     }
                 }

crates/collab/src/db.rs 🔗

@@ -1347,6 +1347,7 @@ impl Database {
         user_id: UserId,
         connection_id: ConnectionId,
     ) -> Result<RejoinedRoom> {
+        println!("==============");
         todo!()
     }
 
@@ -1573,11 +1574,8 @@ impl Database {
         .await
     }
 
-    pub async fn connection_lost(
-        &self,
-        connection: ConnectionId,
-    ) -> Result<RoomGuard<Vec<LeftProject>>> {
-        self.room_transaction(|tx| async move {
+    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
+        self.transaction(|tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -1593,7 +1591,6 @@ impl Database {
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("not a participant in any room"))?;
-            let room_id = participant.room_id;
 
             room_participant::Entity::update(room_participant::ActiveModel {
                 answering_connection_lost: ActiveValue::set(true),
@@ -1602,63 +1599,7 @@ impl Database {
             .exec(&*tx)
             .await?;
 
-            let guest_collaborators_and_projects = project_collaborator::Entity::find()
-                .find_also_related(project::Entity)
-                .filter(
-                    Condition::all()
-                        .add(project_collaborator::Column::IsHost.eq(false))
-                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
-                        .add(
-                            project_collaborator::Column::ConnectionServerId
-                                .eq(connection.owner_id as i32),
-                        ),
-                )
-                .all(&*tx)
-                .await?;
-
-            project_collaborator::Entity::delete_many()
-                .filter(
-                    project_collaborator::Column::Id
-                        .is_in(guest_collaborators_and_projects.iter().map(|e| e.0.id)),
-                )
-                .exec(&*tx)
-                .await?;
-
-            let mut left_projects = Vec::new();
-            for (_, project) in guest_collaborators_and_projects {
-                let Some(project) = project else { continue };
-                let collaborators = project
-                    .find_related(project_collaborator::Entity)
-                    .all(&*tx)
-                    .await?;
-                let connection_ids = collaborators
-                    .into_iter()
-                    .map(|collaborator| ConnectionId {
-                        id: collaborator.connection_id as u32,
-                        owner_id: collaborator.connection_server_id.0 as u32,
-                    })
-                    .collect();
-
-                left_projects.push(LeftProject {
-                    id: project.id,
-                    host_user_id: project.host_user_id,
-                    host_connection_id: project.host_connection()?,
-                    connection_ids,
-                });
-            }
-
-            project::Entity::delete_many()
-                .filter(
-                    Condition::all()
-                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
-                        .add(
-                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
-                        ),
-                )
-                .exec(&*tx)
-                .await?;
-
-            Ok((room_id, left_projects))
+            Ok(())
         })
         .await
     }

crates/collab/src/integration_tests.rs 🔗

@@ -1351,19 +1351,27 @@ async fn test_host_reconnect(
         .unwrap();
 
     let project_b = client_b.build_remote_project(project_id, cx_b).await;
-    assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
+    deterministic.run_until_parked();
+
+    let worktree_id = worktree_a.read_with(cx_a, |worktree, _| {
+        assert!(worktree.as_local().unwrap().is_shared());
+        worktree.id()
+    });
 
     // Drop client A's connection.
     server.forbid_connections();
     server.disconnect_client(client_a.peer_id().unwrap());
     deterministic.advance_clock(RECEIVE_TIMEOUT);
     project_a.read_with(cx_a, |project, _| {
-        assert!(project.collaborators().is_empty())
+        assert!(project.is_shared());
+        assert_eq!(project.collaborators().len(), 1);
+    });
+    project_b.read_with(cx_b, |project, _| {
+        assert!(!project.is_read_only());
+        assert_eq!(project.collaborators().len(), 1);
     });
-    project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
-    project_b.read_with(cx_b, |project, _| assert!(project.is_read_only()));
     worktree_a.read_with(cx_a, |tree, _| {
-        assert!(!tree.as_local().unwrap().is_shared())
+        assert!(tree.as_local().unwrap().is_shared())
     });
 
     // While disconnected, add and remove files from the client A's project.
@@ -1393,9 +1401,60 @@ async fn test_host_reconnect(
 
     // Client A reconnects. Their project is re-shared, and client B re-joins it.
     server.allow_connections();
-    deterministic.advance_clock(RECEIVE_TIMEOUT);
-    project_a.read_with(cx_a, |project, _| assert!(project.is_shared()));
-    project_b.read_with(cx_b, |project, _| assert!(!project.is_read_only()));
+    client_a
+        .authenticate_and_connect(false, &cx_a.to_async())
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    project_a.read_with(cx_a, |project, cx| {
+        assert!(project.is_shared());
+        assert_eq!(
+            worktree_a
+                .read(cx)
+                .snapshot()
+                .paths()
+                .map(|p| p.to_str().unwrap())
+                .collect::<Vec<_>>(),
+            vec![
+                "a.txt",
+                "b.txt",
+                "subdir1",
+                "subdir1/c.txt",
+                "subdir1/d.txt",
+                "subdir1/e.txt",
+                "subdir2",
+                "subdir2/f.txt",
+                "subdir2/g.txt",
+                "subdir2/h.txt",
+                "subdir2/i.txt"
+            ]
+        );
+    });
+    project_b.read_with(cx_b, |project, cx| {
+        assert!(!project.is_read_only());
+        let worktree_b = project.worktree_for_id(worktree_id, cx).unwrap();
+        assert_eq!(
+            worktree_b
+                .read(cx)
+                .snapshot()
+                .paths()
+                .map(|p| p.to_str().unwrap())
+                .collect::<Vec<_>>(),
+            vec![
+                "a.txt",
+                "b.txt",
+                "subdir1",
+                "subdir1/c.txt",
+                "subdir1/d.txt",
+                "subdir1/e.txt",
+                "subdir2",
+                "subdir2/f.txt",
+                "subdir2/g.txt",
+                "subdir2/h.txt",
+                "subdir2/i.txt"
+            ]
+        );
+    });
 }
 
 #[gpui::test(iterations = 10)]
@@ -6169,7 +6228,6 @@ async fn test_random_collaboration(
     let mut user_ids = Vec::new();
     let mut op_start_signals = Vec::new();
     let mut next_entity_id = 100000;
-    let mut can_disconnect = rng.lock().gen_bool(0.2);
 
     let mut operations = 0;
     while operations < max_operations {

crates/collab/src/rpc.rs 🔗

@@ -799,17 +799,12 @@ async fn sign_out(
         .await
         .remove_connection(session.connection_id)?;
 
-    if let Some(mut left_projects) = session
+    session
         .db()
         .await
         .connection_lost(session.connection_id)
         .await
-        .trace_err()
-    {
-        for left_project in mem::take(&mut *left_projects) {
-            project_left(&left_project, &session);
-        }
-    }
+        .trace_err();
 
     futures::select_biased! {
         _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {

crates/project/src/project.rs 🔗

@@ -156,7 +156,6 @@ enum ProjectClientState {
         sharing_has_stopped: bool,
         remote_id: u64,
         replica_id: ReplicaId,
-        _detect_unshare: Task<Option<()>>,
     },
 }
 
@@ -495,21 +494,6 @@ impl Project {
                     sharing_has_stopped: false,
                     remote_id,
                     replica_id,
-                    _detect_unshare: cx.spawn_weak(move |this, mut cx| {
-                        async move {
-                            let mut status = client.status();
-                            let is_connected =
-                                status.next().await.map_or(false, |s| s.is_connected());
-                            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
-                            if !is_connected || status.next().await.is_some() {
-                                if let Some(this) = this.upgrade(&cx) {
-                                    this.update(&mut cx, |this, cx| this.disconnected_from_host(cx))
-                                }
-                            }
-                            Ok(())
-                        }
-                        .log_err()
-                    }),
                 }),
                 language_servers: Default::default(),
                 language_server_ids: Default::default(),
@@ -1100,6 +1084,15 @@ impl Project {
         Ok(())
     }
 
+    pub fn rejoined(
+        &mut self,
+        message: proto::RejoinedProject,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        self.set_collaborators_from_proto(message.collaborators, cx)?;
+        Ok(())
+    }
+
     pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
         self.worktrees(cx)
             .map(|worktree| {