Move `Store::join_room` into `Db::join_room`

Antonio Scandurra created

Change summary

crates/collab/src/db.rs        | 85 ++++++++++++++++++++++++++++++++++++
crates/collab/src/rpc.rs       | 71 ++++++++++++++++-------------
crates/collab/src/rpc/store.rs | 51 ---------------------
3 files changed, 125 insertions(+), 82 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -1061,6 +1061,91 @@ where
         })
     }
 
+    pub async fn join_room(
+        &self,
+        room_id: RoomId,
+        user_id: UserId,
+        connection_id: ConnectionId,
+    ) -> Result<proto::Room> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+            sqlx::query(
+                "
+                UPDATE calls
+                SET answering_connection_id = $1
+                WHERE room_id = $2 AND called_user_id = $3
+                RETURNING 1
+                ",
+            )
+            .bind(connection_id.0 as i32)
+            .bind(room_id)
+            .bind(user_id)
+            .fetch_one(&mut tx)
+            .await?;
+
+            sqlx::query(
+                "
+                UPDATE room_participants 
+                SET connection_id = $1
+                WHERE room_id = $2 AND user_id = $3
+                RETURNING 1
+                ",
+            )
+            .bind(connection_id.0 as i32)
+            .bind(room_id)
+            .bind(user_id)
+            .fetch_one(&mut tx)
+            .await?;
+
+            self.commit_room_transaction(room_id, tx).await
+        })
+
+        // let connection = self
+        //     .connections
+        //     .get_mut(&connection_id)
+        //     .ok_or_else(|| anyhow!("no such connection"))?;
+        // let user_id = connection.user_id;
+        // let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
+
+        // let connected_user = self
+        //     .connected_users
+        //     .get_mut(&user_id)
+        //     .ok_or_else(|| anyhow!("no such connection"))?;
+        // let active_call = connected_user
+        //     .active_call
+        //     .as_mut()
+        //     .ok_or_else(|| anyhow!("not being called"))?;
+        // anyhow::ensure!(
+        //     active_call.room_id == room_id && active_call.connection_id.is_none(),
+        //     "not being called on this room"
+        // );
+
+        // let room = self
+        //     .rooms
+        //     .get_mut(&room_id)
+        //     .ok_or_else(|| anyhow!("no such room"))?;
+        // anyhow::ensure!(
+        //     room.pending_participant_user_ids
+        //         .contains(&user_id.to_proto()),
+        //     anyhow!("no such room")
+        // );
+        // room.pending_participant_user_ids
+        //     .retain(|pending| *pending != user_id.to_proto());
+        // room.participants.push(proto::Participant {
+        //     user_id: user_id.to_proto(),
+        //     peer_id: connection_id.0,
+        //     projects: Default::default(),
+        //     location: Some(proto::ParticipantLocation {
+        //         variant: Some(proto::participant_location::Variant::External(
+        //             proto::participant_location::External {},
+        //         )),
+        //     }),
+        // });
+        // active_call.connection_id = Some(connection_id);
+
+        // Ok((room, recipient_connection_ids))
+    }
+
     pub async fn update_room_participant_location(
         &self,
         room_id: RoomId,

crates/collab/src/rpc.rs 🔗

@@ -607,42 +607,51 @@ impl Server {
         request: Message<proto::JoinRoom>,
         response: Response<proto::JoinRoom>,
     ) -> Result<()> {
+        let room = self
+            .app_state
+            .db
+            .join_room(
+                RoomId::from_proto(request.payload.id),
+                request.sender_user_id,
+                request.sender_connection_id,
+            )
+            .await?;
+        for recipient_id in self
+            .store()
+            .await
+            .connection_ids_for_user(request.sender_user_id)
         {
-            let mut store = self.store().await;
-            let (room, recipient_connection_ids) =
-                store.join_room(request.payload.id, request.sender_connection_id)?;
-            for recipient_id in recipient_connection_ids {
-                self.peer
-                    .send(recipient_id, proto::CallCanceled {})
-                    .trace_err();
-            }
+            self.peer
+                .send(recipient_id, proto::CallCanceled {})
+                .trace_err();
+        }
 
-            let live_kit_connection_info =
-                if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
-                    if let Some(token) = live_kit
-                        .room_token(
-                            &room.live_kit_room,
-                            &request.sender_connection_id.to_string(),
-                        )
-                        .trace_err()
-                    {
-                        Some(proto::LiveKitConnectionInfo {
-                            server_url: live_kit.url().into(),
-                            token,
-                        })
-                    } else {
-                        None
-                    }
+        let live_kit_connection_info =
+            if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
+                if let Some(token) = live_kit
+                    .room_token(
+                        &room.live_kit_room,
+                        &request.sender_connection_id.to_string(),
+                    )
+                    .trace_err()
+                {
+                    Some(proto::LiveKitConnectionInfo {
+                        server_url: live_kit.url().into(),
+                        token,
+                    })
                 } else {
                     None
-                };
+                }
+            } else {
+                None
+            };
+
+        self.room_updated(&room);
+        response.send(proto::JoinRoomResponse {
+            room: Some(room),
+            live_kit_connection_info,
+        })?;
 
-            response.send(proto::JoinRoomResponse {
-                room: Some(room.clone()),
-                live_kit_connection_info,
-            })?;
-            self.room_updated(room);
-        }
         self.update_user_contacts(request.sender_user_id).await?;
         Ok(())
     }

crates/collab/src/rpc/store.rs 🔗

@@ -257,57 +257,6 @@ impl Store {
         }
     }
 
-    pub fn join_room(
-        &mut self,
-        room_id: RoomId,
-        connection_id: ConnectionId,
-    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
-        let connection = self
-            .connections
-            .get_mut(&connection_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-        let user_id = connection.user_id;
-        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
-
-        let connected_user = self
-            .connected_users
-            .get_mut(&user_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-        let active_call = connected_user
-            .active_call
-            .as_mut()
-            .ok_or_else(|| anyhow!("not being called"))?;
-        anyhow::ensure!(
-            active_call.room_id == room_id && active_call.connection_id.is_none(),
-            "not being called on this room"
-        );
-
-        let room = self
-            .rooms
-            .get_mut(&room_id)
-            .ok_or_else(|| anyhow!("no such room"))?;
-        anyhow::ensure!(
-            room.pending_participant_user_ids
-                .contains(&user_id.to_proto()),
-            anyhow!("no such room")
-        );
-        room.pending_participant_user_ids
-            .retain(|pending| *pending != user_id.to_proto());
-        room.participants.push(proto::Participant {
-            user_id: user_id.to_proto(),
-            peer_id: connection_id.0,
-            projects: Default::default(),
-            location: Some(proto::ParticipantLocation {
-                variant: Some(proto::participant_location::Variant::External(
-                    proto::participant_location::External {},
-                )),
-            }),
-        });
-        active_call.connection_id = Some(connection_id);
-
-        Ok((room, recipient_connection_ids))
-    }
-
     pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
         let connection = self
             .connections