Begin tracking follow states on collab server

Julia and Antonio Scandurra created

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/call/src/room.rs                               | 22 +++++
crates/collab/migrations/20230202155735_followers.sql | 15 +++
crates/collab/src/db.rs                               | 42 ++++++++++
crates/collab/src/db/follower.rs                      | 51 +++++++++++++
crates/collab/src/db/room.rs                          |  8 ++
crates/collab/src/rpc.rs                              | 10 ++
crates/rpc/proto/zed.proto                            | 10 ++
crates/rpc/src/rpc.rs                                 |  2 
8 files changed, 157 insertions(+), 3 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -55,6 +55,7 @@ pub struct Room {
     leave_when_empty: bool,
     client: Arc<Client>,
     user_store: ModelHandle<UserStore>,
+    follows_by_leader_id: HashMap<PeerId, HashSet<PeerId>>,
     subscriptions: Vec<client::Subscription>,
     pending_room_update: Option<Task<()>>,
     maintain_connection: Option<Task<Option<()>>>,
@@ -148,6 +149,7 @@ impl Room {
             pending_room_update: None,
             client,
             user_store,
+            follows_by_leader_id: Default::default(),
             maintain_connection: Some(maintain_connection),
         }
     }
@@ -487,11 +489,13 @@ impl Room {
             .iter()
             .map(|p| p.user_id)
             .collect::<Vec<_>>();
+
         let remote_participant_user_ids = room
             .participants
             .iter()
             .map(|p| p.user_id)
             .collect::<Vec<_>>();
+
         let (remote_participants, pending_participants) =
             self.user_store.update(cx, move |user_store, cx| {
                 (
@@ -499,6 +503,7 @@ impl Room {
                     user_store.get_users(pending_participant_user_ids, cx),
                 )
             });
+
         self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
             let (remote_participants, pending_participants) =
                 futures::join!(remote_participants, pending_participants);
@@ -620,6 +625,23 @@ impl Room {
                     }
                 }
 
+                this.follows_by_leader_id.clear();
+                for follower in room.followers {
+                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
+                        (Some(leader), Some(follower)) => (leader, follower),
+
+                        _ => {
+                            log::error!("Follower message {follower:?} missing some state");
+                            continue;
+                        }
+                    };
+
+                    this.follows_by_leader_id
+                        .entry(leader)
+                        .or_insert(Default::default())
+                        .insert(follower);
+                }
+
                 this.pending_room_update.take();
                 if this.should_leave() {
                     log::info!("room is empty, leaving");

crates/collab/migrations/20230202155735_followers.sql 🔗

@@ -0,0 +1,15 @@
+CREATE TABLE IF NOT EXISTS "followers" (
+    "id" SERIAL PRIMARY KEY,
+    "room_id" INTEGER NOT NULL REFERENCES rooms (id) ON DELETE CASCADE,
+    "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
+    "leader_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+    "leader_connection_id" INTEGER NOT NULL,
+    "follower_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+    "follower_connection_id" INTEGER NOT NULL
+);
+
+CREATE UNIQUE INDEX 
+    "index_followers_on_project_id_and_leader_connection_server_id_and_leader_connection_id_and_follower_connection_server_id_and_follower_connection_id"
+ON "followers" ("project_id", "leader_connection_server_id", "leader_connection_id", "follower_connection_server_id", "follower_connection_id");
+
+CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id");

crates/collab/src/db.rs 🔗

@@ -1,5 +1,6 @@
 mod access_token;
 mod contact;
+mod follower;
 mod language_server;
 mod project;
 mod project_collaborator;
@@ -1717,6 +1718,35 @@ impl Database {
         .await
     }
 
+    pub async fn follow(
+        &self,
+        room_id: RoomId,
+        project_id: ProjectId,
+        leader_connection: ConnectionId,
+        follower_connection: ConnectionId,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.room_transaction(|tx| async move {
+            follower::ActiveModel {
+                room_id: ActiveValue::set(room_id),
+                project_id: ActiveValue::set(project_id),
+                leader_connection_server_id: ActiveValue::set(ServerId(
+                    leader_connection.owner_id as i32,
+                )),
+                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
+                follower_connection_server_id: ActiveValue::set(ServerId(
+                    follower_connection.owner_id as i32,
+                )),
+                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
+                ..Default::default()
+            }
+            .insert(&*tx)
+            .await?;
+
+            Ok((room_id, self.get_room(room_id, &*tx).await?))
+        })
+        .await
+    }
+
     pub async fn update_room_participant_location(
         &self,
         room_id: RoomId,
@@ -1927,11 +1957,22 @@ impl Database {
             }
         }
 
+        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
+        let mut followers = Vec::new();
+        while let Some(db_follower) = db_followers.next().await {
+            let db_follower = db_follower?;
+            followers.push(proto::Follower {
+                leader_id: Some(db_follower.leader_connection().into()),
+                follower_id: Some(db_follower.follower_connection().into()),
+            });
+        }
+
         Ok(proto::Room {
             id: db_room.id.to_proto(),
             live_kit_room: db_room.live_kit_room,
             participants: participants.into_values().collect(),
             pending_participants,
+            followers,
         })
     }
 
@@ -3011,6 +3052,7 @@ macro_rules! id_type {
 
 id_type!(AccessTokenId);
 id_type!(ContactId);
+id_type!(FollowerId);
 id_type!(RoomId);
 id_type!(RoomParticipantId);
 id_type!(ProjectId);

crates/collab/src/db/follower.rs 🔗

@@ -0,0 +1,51 @@
+use super::{FollowerId, ProjectId, RoomId, ServerId};
+use rpc::ConnectionId;
+use sea_orm::entity::prelude::*;
+use serde::Serialize;
+
+#[derive(Clone, Debug, Default, PartialEq, Eq, DeriveEntityModel, Serialize)]
+#[sea_orm(table_name = "followers")]
+pub struct Model {
+    #[sea_orm(primary_key)]
+    pub id: FollowerId,
+    pub room_id: RoomId,
+    pub project_id: ProjectId,
+    pub leader_connection_server_id: ServerId,
+    pub leader_connection_id: i32,
+    pub follower_connection_server_id: ServerId,
+    pub follower_connection_id: i32,
+}
+
+impl Model {
+    pub fn leader_connection(&self) -> ConnectionId {
+        ConnectionId {
+            owner_id: self.leader_connection_server_id.0 as u32,
+            id: self.leader_connection_id as u32,
+        }
+    }
+
+    pub fn follower_connection(&self) -> ConnectionId {
+        ConnectionId {
+            owner_id: self.follower_connection_server_id.0 as u32,
+            id: self.follower_connection_id as u32,
+        }
+    }
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(
+        belongs_to = "super::room::Entity",
+        from = "Column::RoomId",
+        to = "super::room::Column::Id"
+    )]
+    Room,
+}
+
+impl Related<super::room::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Room.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/db/room.rs 🔗

@@ -15,6 +15,8 @@ pub enum Relation {
     RoomParticipant,
     #[sea_orm(has_many = "super::project::Entity")]
     Project,
+    #[sea_orm(has_many = "super::follower::Entity")]
+    Follower,
 }
 
 impl Related<super::room_participant::Entity> for Entity {
@@ -29,4 +31,10 @@ impl Related<super::project::Entity> for Entity {
     }
 }
 
+impl Related<super::follower::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Follower.def()
+    }
+}
+
 impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/rpc.rs 🔗

@@ -1312,6 +1312,7 @@ async fn join_project(
         .filter(|collaborator| collaborator.connection_id != session.connection_id)
         .map(|collaborator| collaborator.to_proto())
         .collect::<Vec<_>>();
+
     let worktrees = project
         .worktrees
         .iter()
@@ -1719,6 +1720,7 @@ async fn follow(
     session: Session,
 ) -> Result<()> {
     let project_id = ProjectId::from_proto(request.project_id);
+    let room_id = RoomId::from_proto(request.project_id);
     let leader_id = request
         .leader_id
         .ok_or_else(|| anyhow!("invalid leader id"))?
@@ -1744,6 +1746,14 @@ async fn follow(
         .views
         .retain(|view| view.leader_id != Some(follower_id.into()));
     response.send(response_payload)?;
+
+    let room = session
+        .db()
+        .await
+        .follow(room_id, project_id, leader_id, follower_id)
+        .await?;
+    room_updated(&room, &session.peer);
+
     Ok(())
 }
 

crates/rpc/proto/zed.proto 🔗

@@ -16,7 +16,7 @@ message Envelope {
         Error error = 6;
         Ping ping = 7;
         Test test = 8;
-        
+
         CreateRoom create_room = 9;
         CreateRoomResponse create_room_response = 10;
         JoinRoom join_room = 11;
@@ -206,7 +206,8 @@ message Room {
     uint64 id = 1;
     repeated Participant participants = 2;
     repeated PendingParticipant pending_participants = 3;
-    string live_kit_room = 4;
+    repeated Follower followers = 4;
+    string live_kit_room = 5;
 }
 
 message Participant {
@@ -227,6 +228,11 @@ message ParticipantProject {
     repeated string worktree_root_names = 2;
 }
 
+message Follower {
+    PeerId leader_id = 1;
+    PeerId follower_id = 2;
+}
+
 message ParticipantLocation {
     oneof variant {
         SharedProject shared_project = 1;

crates/rpc/src/rpc.rs 🔗

@@ -6,4 +6,4 @@ pub use conn::Connection;
 pub use peer::*;
 mod macros;
 
-pub const PROTOCOL_VERSION: u32 = 46;
+pub const PROTOCOL_VERSION: u32 = 47;