Sketch out project reconnection routine on the server

Antonio Scandurra created

Change summary

crates/collab/src/db.rs    | 111 ++++++++++++++++++----
crates/collab/src/rpc.rs   | 190 ++++++++++++++++++++++++++++++++-------
crates/rpc/proto/zed.proto |  14 ++
crates/rpc/src/proto.rs    |   2 
4 files changed, 257 insertions(+), 60 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -1319,15 +1319,7 @@ impl Database {
                     Condition::all()
                         .add(room_participant::Column::RoomId.eq(room_id))
                         .add(room_participant::Column::UserId.eq(user_id))
-                        .add(
-                            Condition::any()
-                                .add(room_participant::Column::AnsweringConnectionId.is_null())
-                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
-                                .add(
-                                    room_participant::Column::AnsweringConnectionServerId
-                                        .ne(connection.owner_id as i32),
-                                ),
-                        ),
+                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
                 )
                 .set(room_participant::ActiveModel {
                     answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
@@ -1349,6 +1341,15 @@ impl Database {
         .await
     }
 
+    pub async fn rejoin_room(
+        &self,
+        room_id: proto::RejoinRoom,
+        user_id: UserId,
+        connection_id: ConnectionId,
+    ) -> Result<RejoinedRoom> {
+        todo!()
+    }
+
     pub async fn leave_room(
         &self,
         connection: ConnectionId,
@@ -2287,7 +2288,18 @@ impl Database {
 
             let room_id = project.room_id;
             let project = Project {
-                collaborators,
+                collaborators: collaborators
+                    .into_iter()
+                    .map(|collaborator| ProjectCollaborator {
+                        connection_id: ConnectionId {
+                            owner_id: collaborator.connection_server_id.0 as u32,
+                            id: collaborator.connection_id as u32,
+                        },
+                        user_id: collaborator.user_id,
+                        replica_id: collaborator.replica_id,
+                        is_host: collaborator.is_host,
+                    })
+                    .collect(),
                 worktrees,
                 language_servers: language_servers
                     .into_iter()
@@ -2354,8 +2366,8 @@ impl Database {
     pub async fn project_collaborators(
         &self,
         project_id: ProjectId,
-        connection: ConnectionId,
-    ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
         self.room_transaction(|tx| async move {
             let project = project::Entity::find_by_id(project_id)
                 .one(&*tx)
@@ -2364,15 +2376,23 @@ impl Database {
             let collaborators = project_collaborator::Entity::find()
                 .filter(project_collaborator::Column::ProjectId.eq(project_id))
                 .all(&*tx)
-                .await?;
+                .await?
+                .into_iter()
+                .map(|collaborator| ProjectCollaborator {
+                    connection_id: ConnectionId {
+                        owner_id: collaborator.connection_server_id.0 as u32,
+                        id: collaborator.connection_id as u32,
+                    },
+                    user_id: collaborator.user_id,
+                    replica_id: collaborator.replica_id,
+                    is_host: collaborator.is_host,
+                })
+                .collect::<Vec<_>>();
 
-            if collaborators.iter().any(|collaborator| {
-                let collaborator_connection = ConnectionId {
-                    owner_id: collaborator.connection_server_id.0 as u32,
-                    id: collaborator.connection_id as u32,
-                };
-                collaborator_connection == connection
-            }) {
+            if collaborators
+                .iter()
+                .any(|collaborator| collaborator.connection_id == connection_id)
+            {
                 Ok((project.room_id, collaborators))
             } else {
                 Err(anyhow!("no such project"))?
@@ -2846,6 +2866,38 @@ id_type!(ServerId);
 id_type!(SignupId);
 id_type!(UserId);
 
+pub struct RejoinedRoom {
+    pub room: proto::Room,
+    pub rejoined_projects: Vec<RejoinedProject>,
+    pub reshared_projects: Vec<ResharedProject>,
+}
+
+pub struct ResharedProject {
+    pub id: ProjectId,
+    pub old_connection_id: ConnectionId,
+    pub collaborators: Vec<ProjectCollaborator>,
+}
+
+pub struct RejoinedProject {
+    pub id: ProjectId,
+    pub old_connection_id: ConnectionId,
+    pub collaborators: Vec<ProjectCollaborator>,
+    pub worktrees: Vec<RejoinedWorktree>,
+    pub language_servers: Vec<proto::LanguageServer>,
+}
+
+pub struct RejoinedWorktree {
+    pub id: u64,
+    pub abs_path: String,
+    pub root_name: String,
+    pub visible: bool,
+    pub updated_entries: Vec<proto::Entry>,
+    pub removed_entries: Vec<u64>,
+    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
+    pub scan_id: u64,
+    pub is_complete: bool,
+}
+
 pub struct LeftRoom {
     pub room: proto::Room,
     pub left_projects: HashMap<ProjectId, LeftProject>,
@@ -2859,11 +2911,28 @@ pub struct RefreshedRoom {
 }
 
 pub struct Project {
-    pub collaborators: Vec<project_collaborator::Model>,
+    pub collaborators: Vec<ProjectCollaborator>,
     pub worktrees: BTreeMap<u64, Worktree>,
     pub language_servers: Vec<proto::LanguageServer>,
 }
 
+pub struct ProjectCollaborator {
+    pub connection_id: ConnectionId,
+    pub user_id: UserId,
+    pub replica_id: ReplicaId,
+    pub is_host: bool,
+}
+
+impl ProjectCollaborator {
+    pub fn to_proto(&self) -> proto::Collaborator {
+        proto::Collaborator {
+            peer_id: Some(self.connection_id.into()),
+            replica_id: self.replica_id.0 as u32,
+            user_id: self.user_id.to_proto(),
+        }
+    }
+}
+
 pub struct LeftProject {
     pub id: ProjectId,
     pub host_user_id: UserId,

crates/collab/src/rpc.rs 🔗

@@ -184,6 +184,7 @@ impl Server {
             .add_request_handler(ping)
             .add_request_handler(create_room)
             .add_request_handler(join_room)
+            .add_request_handler(rejoin_room)
             .add_message_handler(leave_room)
             .add_request_handler(call)
             .add_request_handler(cancel_call)
@@ -941,6 +942,148 @@ async fn join_room(
     Ok(())
 }
 
+async fn rejoin_room(
+    request: proto::RejoinRoom,
+    response: Response<proto::RejoinRoom>,
+    session: Session,
+) -> Result<()> {
+    let mut rejoined_room = session
+        .db()
+        .await
+        .rejoin_room(request, session.user_id, session.connection_id)
+        .await?;
+
+    response.send(proto::RejoinRoomResponse {
+        room: Some(rejoined_room.room.clone()),
+        reshared_projects: rejoined_room
+            .reshared_projects
+            .iter()
+            .map(|project| proto::ResharedProject {
+                id: project.id.to_proto(),
+                collaborators: project
+                    .collaborators
+                    .iter()
+                    .map(|collaborator| collaborator.to_proto())
+                    .collect(),
+            })
+            .collect(),
+        rejoined_projects: rejoined_room
+            .rejoined_projects
+            .iter()
+            .map(|rejoined_project| proto::RejoinedProject {
+                id: rejoined_project.id.to_proto(),
+                worktrees: rejoined_project
+                    .worktrees
+                    .iter()
+                    .map(|worktree| proto::WorktreeMetadata {
+                        id: worktree.id,
+                        root_name: worktree.root_name.clone(),
+                        visible: worktree.visible,
+                        abs_path: worktree.abs_path.clone(),
+                    })
+                    .collect(),
+                collaborators: rejoined_project
+                    .collaborators
+                    .iter()
+                    .map(|collaborator| collaborator.to_proto())
+                    .collect(),
+                language_servers: rejoined_project.language_servers.clone(),
+            })
+            .collect(),
+    })?;
+    room_updated(&rejoined_room.room, &session.peer);
+
+    // Notify other participants about this peer's reconnection to projects.
+    for project in &rejoined_room.reshared_projects {
+        for collaborator in &project.collaborators {
+            if collaborator.connection_id != session.connection_id {
+                session
+                    .peer
+                    .send(
+                        collaborator.connection_id,
+                        proto::UpdateProjectCollaborator {
+                            project_id: project.id.to_proto(),
+                            old_peer_id: Some(project.old_connection_id.into()),
+                            new_peer_id: Some(session.connection_id.into()),
+                        },
+                    )
+                    .trace_err();
+            }
+        }
+    }
+    for project in &rejoined_room.rejoined_projects {
+        for collaborator in &project.collaborators {
+            if collaborator.connection_id != session.connection_id {
+                session
+                    .peer
+                    .send(
+                        collaborator.connection_id,
+                        proto::UpdateProjectCollaborator {
+                            project_id: project.id.to_proto(),
+                            old_peer_id: Some(project.old_connection_id.into()),
+                            new_peer_id: Some(session.connection_id.into()),
+                        },
+                    )
+                    .trace_err();
+            }
+        }
+    }
+
+    for project in &mut rejoined_room.rejoined_projects {
+        for worktree in mem::take(&mut project.worktrees) {
+            #[cfg(any(test, feature = "test-support"))]
+            const MAX_CHUNK_SIZE: usize = 2;
+            #[cfg(not(any(test, feature = "test-support")))]
+            const MAX_CHUNK_SIZE: usize = 256;
+
+            // Stream this worktree's entries.
+            let message = proto::UpdateWorktree {
+                project_id: project.id.to_proto(),
+                worktree_id: worktree.id,
+                abs_path: worktree.abs_path.clone(),
+                root_name: worktree.root_name,
+                updated_entries: worktree.updated_entries,
+                removed_entries: worktree.removed_entries,
+                scan_id: worktree.scan_id,
+                is_last_update: worktree.is_complete,
+            };
+            for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
+                session.peer.send(session.connection_id, update.clone())?;
+            }
+
+            // Stream this worktree's diagnostics.
+            for summary in worktree.diagnostic_summaries {
+                session.peer.send(
+                    session.connection_id,
+                    proto::UpdateDiagnosticSummary {
+                        project_id: project.id.to_proto(),
+                        worktree_id: worktree.id,
+                        summary: Some(summary),
+                    },
+                )?;
+            }
+        }
+
+        for language_server in &project.language_servers {
+            session.peer.send(
+                session.connection_id,
+                proto::UpdateLanguageServer {
+                    project_id: project.id.to_proto(),
+                    language_server_id: language_server.id,
+                    variant: Some(
+                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
+                            proto::LspDiskBasedDiagnosticsUpdated {},
+                        ),
+                    ),
+                },
+            )?;
+        }
+    }
+
+    update_user_contacts(session.user_id, &session).await?;
+    Ok(())
+}
+
 async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
     leave_room_for_session(&session).await
 }
@@ -1160,18 +1303,8 @@ async fn join_project(
     let collaborators = project
         .collaborators
         .iter()
-        .map(|collaborator| {
-            let peer_id = proto::PeerId {
-                owner_id: collaborator.connection_server_id.0 as u32,
-                id: collaborator.connection_id as u32,
-            };
-            proto::Collaborator {
-                peer_id: Some(peer_id),
-                replica_id: collaborator.replica_id.0 as u32,
-                user_id: collaborator.user_id.to_proto(),
-            }
-        })
-        .filter(|collaborator| collaborator.peer_id != Some(session.connection_id.into()))
+        .filter(|collaborator| collaborator.connection_id != session.connection_id)
+        .map(|collaborator| collaborator.to_proto())
         .collect::<Vec<_>>();
     let worktrees = project
         .worktrees
@@ -1413,14 +1546,11 @@ where
             .await
             .project_collaborators(project_id, session.connection_id)
             .await?;
-        let host = collaborators
+        collaborators
             .iter()
             .find(|collaborator| collaborator.is_host)
-            .ok_or_else(|| anyhow!("host not found"))?;
-        ConnectionId {
-            owner_id: host.connection_server_id.0 as u32,
-            id: host.connection_id as u32,
-        }
+            .ok_or_else(|| anyhow!("host not found"))?
+            .connection_id
     };
 
     let payload = session
@@ -1444,14 +1574,11 @@ async fn save_buffer(
             .await
             .project_collaborators(project_id, session.connection_id)
             .await?;
-        let host = collaborators
+        collaborators
             .iter()
             .find(|collaborator| collaborator.is_host)
-            .ok_or_else(|| anyhow!("host not found"))?;
-        ConnectionId {
-            owner_id: host.connection_server_id.0 as u32,
-            id: host.connection_id as u32,
-        }
+            .ok_or_else(|| anyhow!("host not found"))?
+            .connection_id
     };
     let response_payload = session
         .peer
@@ -1463,17 +1590,10 @@ async fn save_buffer(
         .await
         .project_collaborators(project_id, session.connection_id)
         .await?;
-    collaborators.retain(|collaborator| {
-        let collaborator_connection = ConnectionId {
-            owner_id: collaborator.connection_server_id.0 as u32,
-            id: collaborator.connection_id as u32,
-        };
-        collaborator_connection != session.connection_id
-    });
-    let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId {
-        owner_id: collaborator.connection_server_id.0 as u32,
-        id: collaborator.connection_id as u32,
-    });
+    collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id);
+    let project_connection_ids = collaborators
+        .iter()
+        .map(|collaborator| collaborator.connection_id);
     broadcast(host_connection_id, project_connection_ids, |conn_id| {
         session
             .peer

crates/rpc/proto/zed.proto 🔗

@@ -39,6 +39,7 @@ message Envelope {
         JoinProjectResponse join_project_response = 25;
         LeaveProject leave_project = 26;
         AddProjectCollaborator add_project_collaborator = 27;
+        UpdateProjectCollaborator update_project_collaborator = 110;
         RemoveProjectCollaborator remove_project_collaborator = 28;
 
         GetDefinition get_definition = 29;
@@ -193,10 +194,9 @@ message ResharedProject {
 
 message RejoinedProject {
     uint64 id = 1;
-    uint32 replica_id = 2;
-    repeated WorktreeMetadata worktrees = 3;
-    repeated Collaborator collaborators = 4;
-    repeated LanguageServer language_servers = 5;
+    repeated WorktreeMetadata worktrees = 2;
+    repeated Collaborator collaborators = 3;
+    repeated LanguageServer language_servers = 4;
 }
 
 message LeaveRoom {}
@@ -360,6 +360,12 @@ message AddProjectCollaborator {
     Collaborator collaborator = 2;
 }
 
+message UpdateProjectCollaborator {
+    uint64 project_id = 1;
+    PeerId old_peer_id = 2;
+    PeerId new_peer_id = 3;
+}
+
 message RemoveProjectCollaborator {
     uint64 project_id = 1;
     PeerId peer_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -219,6 +219,7 @@ messages!(
     (UpdateLanguageServer, Foreground),
     (UpdateParticipantLocation, Foreground),
     (UpdateProject, Foreground),
+    (UpdateProjectCollaborator, Foreground),
     (UpdateWorktree, Foreground),
     (UpdateDiffBase, Background),
     (GetPrivateUserInfo, Foreground),
@@ -322,6 +323,7 @@ entity_messages!(
     UpdateFollowers,
     UpdateLanguageServer,
     UpdateProject,
+    UpdateProjectCollaborator,
     UpdateWorktree,
     UpdateDiffBase
 );