Move incoming calls into `Db`

Antonio Scandurra created

Change summary

crates/collab/src/db.rs        | 89 ++++++++++++++++++++++++++++++++---
crates/collab/src/rpc.rs       | 31 ++---------
crates/collab/src/rpc/store.rs | 48 -------------------
3 files changed, 89 insertions(+), 79 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -940,7 +940,7 @@ where
         calling_user_id: UserId,
         called_user_id: UserId,
         initial_project_id: Option<ProjectId>,
-    ) -> Result<proto::Room> {
+    ) -> Result<(proto::Room, proto::IncomingCall)> {
         test_support!(self, {
             let mut tx = self.pool.begin().await?;
             sqlx::query(
@@ -967,10 +967,67 @@ where
             .execute(&mut tx)
             .await?;
 
-            self.commit_room_transaction(room_id, tx).await
+            let room = self.commit_room_transaction(room_id, tx).await?;
+            let incoming_call =
+                Self::build_incoming_call(&room, calling_user_id, initial_project_id);
+            Ok((room, incoming_call))
         })
     }
 
+    pub async fn incoming_call_for_user(
+        &self,
+        user_id: UserId,
+    ) -> Result<Option<proto::IncomingCall>> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+            let call = sqlx::query_as::<_, Call>(
+                "
+                SELECT *
+                FROM calls
+                WHERE called_user_id = $1 AND answering_connection_id IS NULL
+                ",
+            )
+            .bind(user_id)
+            .fetch_optional(&mut tx)
+            .await?;
+
+            if let Some(call) = call {
+                let room = self.get_room(call.room_id, &mut tx).await?;
+                Ok(Some(Self::build_incoming_call(
+                    &room,
+                    call.calling_user_id,
+                    call.initial_project_id,
+                )))
+            } else {
+                Ok(None)
+            }
+        })
+    }
+
+    fn build_incoming_call(
+        room: &proto::Room,
+        calling_user_id: UserId,
+        initial_project_id: Option<ProjectId>,
+    ) -> proto::IncomingCall {
+        proto::IncomingCall {
+            room_id: room.id,
+            calling_user_id: calling_user_id.to_proto(),
+            participant_user_ids: room
+                .participants
+                .iter()
+                .map(|participant| participant.user_id)
+                .collect(),
+            initial_project: room.participants.iter().find_map(|participant| {
+                let initial_project_id = initial_project_id?.to_proto();
+                participant
+                    .projects
+                    .iter()
+                    .find(|project| project.id == initial_project_id)
+                    .cloned()
+            }),
+        }
+    }
+
     pub async fn call_failed(
         &self,
         room_id: RoomId,
@@ -1066,7 +1123,17 @@ where
         .bind(room_id)
         .execute(&mut tx)
         .await?;
+        let room = self.get_room(room_id, &mut tx).await?;
+        tx.commit().await?;
+
+        Ok(room)
+    }
 
+    async fn get_room(
+        &self,
+        room_id: RoomId,
+        tx: &mut sqlx::Transaction<'_, D>,
+    ) -> Result<proto::Room> {
         let room: Room = sqlx::query_as(
             "
             SELECT *
@@ -1075,7 +1142,7 @@ where
             ",
         )
         .bind(room_id)
-        .fetch_one(&mut tx)
+        .fetch_one(&mut *tx)
         .await?;
 
         let mut db_participants =
@@ -1087,7 +1154,7 @@ where
                 ",
             )
             .bind(room_id)
-            .fetch(&mut tx);
+            .fetch(&mut *tx);
 
         let mut participants = Vec::new();
         let mut pending_participant_user_ids = Vec::new();
@@ -1120,7 +1187,7 @@ where
                 ",
             )
             .bind(room_id)
-            .fetch(&mut tx);
+            .fetch(&mut *tx);
 
             let mut projects = HashMap::default();
             while let Some(entry) = entries.next().await {
@@ -1139,9 +1206,6 @@ where
 
             participant.projects = projects.into_values().collect();
         }
-
-        tx.commit().await?;
-
         Ok(proto::Room {
             id: room.id.to_proto(),
             version: room.version as u64,
@@ -1566,6 +1630,15 @@ pub struct Room {
     pub live_kit_room: String,
 }
 
+#[derive(Clone, Debug, Default, FromRow, PartialEq)]
+pub struct Call {
+    pub room_id: RoomId,
+    pub calling_user_id: UserId,
+    pub called_user_id: UserId,
+    pub answering_connection_id: Option<i32>,
+    pub initial_project_id: Option<ProjectId>,
+}
+
 id_type!(ProjectId);
 #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
 pub struct Project {

crates/collab/src/rpc.rs 🔗

@@ -346,11 +346,7 @@ impl Server {
 
             {
                 let mut store = this.store().await;
-                let incoming_call = store.add_connection(connection_id, user_id, user.admin);
-                if let Some(incoming_call) = incoming_call {
-                    this.peer.send(connection_id, incoming_call)?;
-                }
-
+                store.add_connection(connection_id, user_id, user.admin);
                 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 
                 if let Some((code, count)) = invite_code {
@@ -360,6 +356,11 @@ impl Server {
                     })?;
                 }
             }
+
+            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
+                this.peer.send(connection_id, incoming_call)?;
+            }
+
             this.update_user_contacts(user_id).await?;
 
             let handle_io = handle_io.fuse();
@@ -726,7 +727,7 @@ impl Server {
             return Err(anyhow!("cannot call a user who isn't a contact"))?;
         }
 
-        let room = self
+        let (room, incoming_call) = self
             .app_state
             .db
             .call(room_id, calling_user_id, called_user_id, initial_project_id)
@@ -734,24 +735,6 @@ impl Server {
         self.room_updated(&room);
         self.update_user_contacts(called_user_id).await?;
 
-        let incoming_call = proto::IncomingCall {
-            room_id: room_id.to_proto(),
-            calling_user_id: calling_user_id.to_proto(),
-            participant_user_ids: room
-                .participants
-                .iter()
-                .map(|participant| participant.user_id)
-                .collect(),
-            initial_project: room.participants.iter().find_map(|participant| {
-                let initial_project_id = initial_project_id?.to_proto();
-                participant
-                    .projects
-                    .iter()
-                    .find(|project| project.id == initial_project_id)
-                    .cloned()
-            }),
-        };
-
         let mut calls = self
             .store()
             .await

crates/collab/src/rpc/store.rs 🔗

@@ -122,12 +122,7 @@ impl Store {
     }
 
     #[instrument(skip(self))]
-    pub fn add_connection(
-        &mut self,
-        connection_id: ConnectionId,
-        user_id: UserId,
-        admin: bool,
-    ) -> Option<proto::IncomingCall> {
+    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
         self.connections.insert(
             connection_id,
             ConnectionState {
@@ -138,27 +133,6 @@ impl Store {
         );
         let connected_user = self.connected_users.entry(user_id).or_default();
         connected_user.connection_ids.insert(connection_id);
-        if let Some(active_call) = connected_user.active_call {
-            if active_call.connection_id.is_some() {
-                None
-            } else {
-                let room = self.room(active_call.room_id)?;
-                Some(proto::IncomingCall {
-                    room_id: active_call.room_id,
-                    calling_user_id: active_call.calling_user_id.to_proto(),
-                    participant_user_ids: room
-                        .participants
-                        .iter()
-                        .map(|participant| participant.user_id)
-                        .collect(),
-                    initial_project: active_call
-                        .initial_project_id
-                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
-                })
-            }
-        } else {
-            None
-        }
     }
 
     #[instrument(skip(self))]
@@ -411,10 +385,6 @@ impl Store {
         })
     }
 
-    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
-        self.rooms.get(&room_id)
-    }
-
     pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
         &self.rooms
     }
@@ -740,22 +710,6 @@ impl Store {
         Ok(connection_ids)
     }
 
-    fn build_participant_project(
-        project_id: ProjectId,
-        projects: &BTreeMap<ProjectId, Project>,
-    ) -> Option<proto::ParticipantProject> {
-        Some(proto::ParticipantProject {
-            id: project_id.to_proto(),
-            worktree_root_names: projects
-                .get(&project_id)?
-                .worktrees
-                .values()
-                .filter(|worktree| worktree.visible)
-                .map(|worktree| worktree.root_name.clone())
-                .collect(),
-        })
-    }
-
     pub fn project_connection_ids(
         &self,
         project_id: ProjectId,