@@ -907,26 +907,14 @@ where
sqlx::query(
"
- INSERT INTO room_participants (room_id, user_id, connection_id)
- VALUES ($1, $2, $3)
- ",
- )
- .bind(room_id)
- .bind(user_id)
- .bind(connection_id.0 as i32)
- .execute(&mut tx)
- .await?;
-
- sqlx::query(
- "
- INSERT INTO calls (room_id, calling_user_id, called_user_id, answering_connection_id)
+ INSERT INTO room_participants (room_id, user_id, connection_id, calling_user_id)
VALUES ($1, $2, $3, $4)
",
)
.bind(room_id)
.bind(user_id)
- .bind(user_id)
.bind(connection_id.0 as i32)
+ .bind(user_id)
.execute(&mut tx)
.await?;
@@ -945,31 +933,20 @@ where
let mut tx = self.pool.begin().await?;
sqlx::query(
"
- INSERT INTO calls (room_id, calling_user_id, called_user_id, initial_project_id)
+ INSERT INTO room_participants (room_id, user_id, calling_user_id, initial_project_id)
VALUES ($1, $2, $3, $4)
",
)
.bind(room_id)
- .bind(calling_user_id)
.bind(called_user_id)
+ .bind(calling_user_id)
.bind(initial_project_id)
.execute(&mut tx)
.await?;
- sqlx::query(
- "
- INSERT INTO room_participants (room_id, user_id)
- VALUES ($1, $2)
- ",
- )
- .bind(room_id)
- .bind(called_user_id)
- .execute(&mut tx)
- .await?;
-
let room = self.commit_room_transaction(room_id, tx).await?;
- let incoming_call =
- Self::build_incoming_call(&room, calling_user_id, initial_project_id);
+ let incoming_call = Self::build_incoming_call(&room, called_user_id)
+ .ok_or_else(|| anyhow!("failed to build incoming call"))?;
Ok((room, incoming_call))
})
}
@@ -980,24 +957,20 @@ where
) -> Result<Option<proto::IncomingCall>> {
test_support!(self, {
let mut tx = self.pool.begin().await?;
- let call = sqlx::query_as::<_, Call>(
+ let room_id = sqlx::query_scalar::<_, RoomId>(
"
- SELECT *
- FROM calls
- WHERE called_user_id = $1 AND answering_connection_id IS NULL
+ SELECT room_id
+ FROM room_participants
+ WHERE user_id = $1 AND connection_id IS NULL
",
)
.bind(user_id)
.fetch_optional(&mut tx)
.await?;
- if let Some(call) = call {
- let room = self.get_room(call.room_id, &mut tx).await?;
- Ok(Some(Self::build_incoming_call(
- &room,
- call.calling_user_id,
- call.initial_project_id,
- )))
+ if let Some(room_id) = room_id {
+ let room = self.get_room(room_id, &mut tx).await?;
+ Ok(Self::build_incoming_call(&room, user_id))
} else {
Ok(None)
}
@@ -1006,26 +979,30 @@ where
fn build_incoming_call(
room: &proto::Room,
- calling_user_id: UserId,
- initial_project_id: Option<ProjectId>,
- ) -> proto::IncomingCall {
- proto::IncomingCall {
+ called_user_id: UserId,
+ ) -> Option<proto::IncomingCall> {
+ let pending_participant = room
+ .pending_participants
+ .iter()
+ .find(|participant| participant.user_id == called_user_id.to_proto())?;
+
+ Some(proto::IncomingCall {
room_id: room.id,
- calling_user_id: calling_user_id.to_proto(),
+ calling_user_id: pending_participant.calling_user_id,
participant_user_ids: room
.participants
.iter()
.map(|participant| participant.user_id)
.collect(),
initial_project: room.participants.iter().find_map(|participant| {
- let initial_project_id = initial_project_id?.to_proto();
+ let initial_project_id = pending_participant.initial_project_id?;
participant
.projects
.iter()
.find(|project| project.id == initial_project_id)
.cloned()
}),
- }
+ })
}
pub async fn call_failed(
@@ -1035,17 +1012,6 @@ where
) -> Result<proto::Room> {
test_support!(self, {
let mut tx = self.pool.begin().await?;
- sqlx::query(
- "
- DELETE FROM calls
- WHERE room_id = $1 AND called_user_id = $2
- ",
- )
- .bind(room_id)
- .bind(called_user_id)
- .execute(&mut tx)
- .await?;
-
sqlx::query(
"
DELETE FROM room_participants
@@ -1069,20 +1035,6 @@ where
) -> Result<proto::Room> {
test_support!(self, {
let mut tx = self.pool.begin().await?;
- sqlx::query(
- "
- UPDATE calls
- SET answering_connection_id = $1
- WHERE room_id = $2 AND called_user_id = $3
- RETURNING 1
- ",
- )
- .bind(connection_id.0 as i32)
- .bind(room_id)
- .bind(user_id)
- .fetch_one(&mut tx)
- .await?;
-
sqlx::query(
"
UPDATE room_participants
@@ -1096,54 +1048,8 @@ where
.bind(user_id)
.fetch_one(&mut tx)
.await?;
-
self.commit_room_transaction(room_id, tx).await
})
-
- // let connection = self
- // .connections
- // .get_mut(&connection_id)
- // .ok_or_else(|| anyhow!("no such connection"))?;
- // let user_id = connection.user_id;
- // let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
-
- // let connected_user = self
- // .connected_users
- // .get_mut(&user_id)
- // .ok_or_else(|| anyhow!("no such connection"))?;
- // let active_call = connected_user
- // .active_call
- // .as_mut()
- // .ok_or_else(|| anyhow!("not being called"))?;
- // anyhow::ensure!(
- // active_call.room_id == room_id && active_call.connection_id.is_none(),
- // "not being called on this room"
- // );
-
- // let room = self
- // .rooms
- // .get_mut(&room_id)
- // .ok_or_else(|| anyhow!("no such room"))?;
- // anyhow::ensure!(
- // room.pending_participant_user_ids
- // .contains(&user_id.to_proto()),
- // anyhow!("no such room")
- // );
- // room.pending_participant_user_ids
- // .retain(|pending| *pending != user_id.to_proto());
- // room.participants.push(proto::Participant {
- // user_id: user_id.to_proto(),
- // peer_id: connection_id.0,
- // projects: Default::default(),
- // location: Some(proto::ParticipantLocation {
- // variant: Some(proto::participant_location::Variant::External(
- // proto::participant_location::External {},
- // )),
- // }),
- // });
- // active_call.connection_id = Some(connection_id);
-
- // Ok((room, recipient_connection_ids))
}
pub async fn update_room_participant_location(
@@ -1231,9 +1137,9 @@ where
.await?;
let mut db_participants =
- sqlx::query_as::<_, (UserId, Option<i32>, Option<i32>, Option<i32>)>(
+ sqlx::query_as::<_, (UserId, Option<i32>, Option<i32>, Option<ProjectId>, UserId, Option<ProjectId>)>(
"
- SELECT user_id, connection_id, location_kind, location_project_id
+ SELECT user_id, connection_id, location_kind, location_project_id, calling_user_id, initial_project_id
FROM room_participants
WHERE room_id = $1
",
@@ -1242,9 +1148,16 @@ where
.fetch(&mut *tx);
let mut participants = Vec::new();
- let mut pending_participant_user_ids = Vec::new();
+ let mut pending_participants = Vec::new();
while let Some(participant) = db_participants.next().await {
- let (user_id, connection_id, _location_kind, _location_project_id) = participant?;
+ let (
+ user_id,
+ connection_id,
+ _location_kind,
+ _location_project_id,
+ calling_user_id,
+ initial_project_id,
+ ) = participant?;
if let Some(connection_id) = connection_id {
participants.push(proto::Participant {
user_id: user_id.to_proto(),
@@ -1257,7 +1170,11 @@ where
}),
});
} else {
- pending_participant_user_ids.push(user_id.to_proto());
+ pending_participants.push(proto::PendingParticipant {
+ user_id: user_id.to_proto(),
+ calling_user_id: calling_user_id.to_proto(),
+ initial_project_id: initial_project_id.map(|id| id.to_proto()),
+ });
}
}
drop(db_participants);
@@ -1296,7 +1213,7 @@ where
version: room.version as u64,
live_kit_room: room.live_kit_room,
participants,
- pending_participant_user_ids,
+ pending_participants,
})
}
@@ -258,80 +258,81 @@ impl Store {
}
pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
- let connection = self
- .connections
- .get_mut(&connection_id)
- .ok_or_else(|| anyhow!("no such connection"))?;
- let user_id = connection.user_id;
-
- let connected_user = self
- .connected_users
- .get(&user_id)
- .ok_or_else(|| anyhow!("no such connection"))?;
- anyhow::ensure!(
- connected_user
- .active_call
- .map_or(false, |call| call.room_id == room_id
- && call.connection_id == Some(connection_id)),
- "cannot leave a room before joining it"
- );
-
- // Given that users can only join one room at a time, we can safely unshare
- // and leave all projects associated with the connection.
- let mut unshared_projects = Vec::new();
- let mut left_projects = Vec::new();
- for project_id in connection.projects.clone() {
- if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
- unshared_projects.push(project);
- } else if let Ok(project) = self.leave_project(project_id, connection_id) {
- left_projects.push(project);
- }
- }
- self.connected_users.get_mut(&user_id).unwrap().active_call = None;
-
- let room = self
- .rooms
- .get_mut(&room_id)
- .ok_or_else(|| anyhow!("no such room"))?;
- room.participants
- .retain(|participant| participant.peer_id != connection_id.0);
-
- let mut canceled_call_connection_ids = Vec::new();
- room.pending_participant_user_ids
- .retain(|pending_participant_user_id| {
- if let Some(connected_user) = self
- .connected_users
- .get_mut(&UserId::from_proto(*pending_participant_user_id))
- {
- if let Some(call) = connected_user.active_call.as_ref() {
- if call.calling_user_id == user_id {
- connected_user.active_call.take();
- canceled_call_connection_ids
- .extend(connected_user.connection_ids.iter().copied());
- false
- } else {
- true
- }
- } else {
- true
- }
- } else {
- true
- }
- });
-
- let room = if room.participants.is_empty() {
- Cow::Owned(self.rooms.remove(&room_id).unwrap())
- } else {
- Cow::Borrowed(self.rooms.get(&room_id).unwrap())
- };
-
- Ok(LeftRoom {
- room,
- unshared_projects,
- left_projects,
- canceled_call_connection_ids,
- })
+ todo!()
+ // let connection = self
+ // .connections
+ // .get_mut(&connection_id)
+ // .ok_or_else(|| anyhow!("no such connection"))?;
+ // let user_id = connection.user_id;
+
+ // let connected_user = self
+ // .connected_users
+ // .get(&user_id)
+ // .ok_or_else(|| anyhow!("no such connection"))?;
+ // anyhow::ensure!(
+ // connected_user
+ // .active_call
+ // .map_or(false, |call| call.room_id == room_id
+ // && call.connection_id == Some(connection_id)),
+ // "cannot leave a room before joining it"
+ // );
+
+ // // Given that users can only join one room at a time, we can safely unshare
+ // // and leave all projects associated with the connection.
+ // let mut unshared_projects = Vec::new();
+ // let mut left_projects = Vec::new();
+ // for project_id in connection.projects.clone() {
+ // if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
+ // unshared_projects.push(project);
+ // } else if let Ok(project) = self.leave_project(project_id, connection_id) {
+ // left_projects.push(project);
+ // }
+ // }
+ // self.connected_users.get_mut(&user_id).unwrap().active_call = None;
+
+ // let room = self
+ // .rooms
+ // .get_mut(&room_id)
+ // .ok_or_else(|| anyhow!("no such room"))?;
+ // room.participants
+ // .retain(|participant| participant.peer_id != connection_id.0);
+
+ // let mut canceled_call_connection_ids = Vec::new();
+ // room.pending_participant_user_ids
+ // .retain(|pending_participant_user_id| {
+ // if let Some(connected_user) = self
+ // .connected_users
+ // .get_mut(&UserId::from_proto(*pending_participant_user_id))
+ // {
+ // if let Some(call) = connected_user.active_call.as_ref() {
+ // if call.calling_user_id == user_id {
+ // connected_user.active_call.take();
+ // canceled_call_connection_ids
+ // .extend(connected_user.connection_ids.iter().copied());
+ // false
+ // } else {
+ // true
+ // }
+ // } else {
+ // true
+ // }
+ // } else {
+ // true
+ // }
+ // });
+
+ // let room = if room.participants.is_empty() {
+ // Cow::Owned(self.rooms.remove(&room_id).unwrap())
+ // } else {
+ // Cow::Borrowed(self.rooms.get(&room_id).unwrap())
+ // };
+
+ // Ok(LeftRoom {
+ // room,
+ // unshared_projects,
+ // left_projects,
+ // canceled_call_connection_ids,
+ // })
}
pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
@@ -344,48 +345,49 @@ impl Store {
called_user_id: UserId,
canceller_connection_id: ConnectionId,
) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
- let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
- let canceller = self
- .connected_users
- .get(&canceller_user_id)
- .ok_or_else(|| anyhow!("no such connection"))?;
- let recipient = self
- .connected_users
- .get(&called_user_id)
- .ok_or_else(|| anyhow!("no such connection"))?;
- let canceller_active_call = canceller
- .active_call
- .as_ref()
- .ok_or_else(|| anyhow!("no active call"))?;
- let recipient_active_call = recipient
- .active_call
- .as_ref()
- .ok_or_else(|| anyhow!("no active call for recipient"))?;
-
- anyhow::ensure!(
- canceller_active_call.room_id == room_id,
- "users are on different calls"
- );
- anyhow::ensure!(
- recipient_active_call.room_id == room_id,
- "users are on different calls"
- );
- anyhow::ensure!(
- recipient_active_call.connection_id.is_none(),
- "recipient has already answered"
- );
- let room_id = recipient_active_call.room_id;
- let room = self
- .rooms
- .get_mut(&room_id)
- .ok_or_else(|| anyhow!("no such room"))?;
- room.pending_participant_user_ids
- .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
-
- let recipient = self.connected_users.get_mut(&called_user_id).unwrap();
- recipient.active_call.take();
-
- Ok((room, recipient.connection_ids.clone()))
+ todo!()
+ // let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
+ // let canceller = self
+ // .connected_users
+ // .get(&canceller_user_id)
+ // .ok_or_else(|| anyhow!("no such connection"))?;
+ // let recipient = self
+ // .connected_users
+ // .get(&called_user_id)
+ // .ok_or_else(|| anyhow!("no such connection"))?;
+ // let canceller_active_call = canceller
+ // .active_call
+ // .as_ref()
+ // .ok_or_else(|| anyhow!("no active call"))?;
+ // let recipient_active_call = recipient
+ // .active_call
+ // .as_ref()
+ // .ok_or_else(|| anyhow!("no active call for recipient"))?;
+
+ // anyhow::ensure!(
+ // canceller_active_call.room_id == room_id,
+ // "users are on different calls"
+ // );
+ // anyhow::ensure!(
+ // recipient_active_call.room_id == room_id,
+ // "users are on different calls"
+ // );
+ // anyhow::ensure!(
+ // recipient_active_call.connection_id.is_none(),
+ // "recipient has already answered"
+ // );
+ // let room_id = recipient_active_call.room_id;
+ // let room = self
+ // .rooms
+ // .get_mut(&room_id)
+ // .ok_or_else(|| anyhow!("no such room"))?;
+ // room.pending_participant_user_ids
+ // .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
+
+ // let recipient = self.connected_users.get_mut(&called_user_id).unwrap();
+ // recipient.active_call.take();
+
+ // Ok((room, recipient.connection_ids.clone()))
}
pub fn decline_call(
@@ -393,31 +395,32 @@ impl Store {
room_id: RoomId,
recipient_connection_id: ConnectionId,
) -> Result<(&proto::Room, Vec<ConnectionId>)> {
- let called_user_id = self.user_id_for_connection(recipient_connection_id)?;
- let recipient = self
- .connected_users
- .get_mut(&called_user_id)
- .ok_or_else(|| anyhow!("no such connection"))?;
- if let Some(active_call) = recipient.active_call {
- anyhow::ensure!(active_call.room_id == room_id, "no such room");
- anyhow::ensure!(
- active_call.connection_id.is_none(),
- "cannot decline a call after joining room"
- );
- recipient.active_call.take();
- let recipient_connection_ids = self
- .connection_ids_for_user(called_user_id)
- .collect::<Vec<_>>();
- let room = self
- .rooms
- .get_mut(&active_call.room_id)
- .ok_or_else(|| anyhow!("no such room"))?;
- room.pending_participant_user_ids
- .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
- Ok((room, recipient_connection_ids))
- } else {
- Err(anyhow!("user is not being called"))
- }
+ todo!()
+ // let called_user_id = self.user_id_for_connection(recipient_connection_id)?;
+ // let recipient = self
+ // .connected_users
+ // .get_mut(&called_user_id)
+ // .ok_or_else(|| anyhow!("no such connection"))?;
+ // if let Some(active_call) = recipient.active_call {
+ // anyhow::ensure!(active_call.room_id == room_id, "no such room");
+ // anyhow::ensure!(
+ // active_call.connection_id.is_none(),
+ // "cannot decline a call after joining room"
+ // );
+ // recipient.active_call.take();
+ // let recipient_connection_ids = self
+ // .connection_ids_for_user(called_user_id)
+ // .collect::<Vec<_>>();
+ // let room = self
+ // .rooms
+ // .get_mut(&active_call.room_id)
+ // .ok_or_else(|| anyhow!("no such room"))?;
+ // room.pending_participant_user_ids
+ // .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
+ // Ok((room, recipient_connection_ids))
+ // } else {
+ // Err(anyhow!("user is not being called"))
+ // }
}
pub fn unshare_project(
@@ -767,13 +770,13 @@ impl Store {
}
for (room_id, room) in &self.rooms {
- for pending_user_id in &room.pending_participant_user_ids {
- assert!(
- self.connected_users
- .contains_key(&UserId::from_proto(*pending_user_id)),
- "call is active on a user that has disconnected"
- );
- }
+ // for pending_user_id in &room.pending_participant_user_ids {
+ // assert!(
+ // self.connected_users
+ // .contains_key(&UserId::from_proto(*pending_user_id)),
+ // "call is active on a user that has disconnected"
+ // );
+ // }
for participant in &room.participants {
assert!(
@@ -793,10 +796,10 @@ impl Store {
}
}
- assert!(
- !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
- "room can't be empty"
- );
+ // assert!(
+ // !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
+ // "room can't be empty"
+ // );
}
for (project_id, project) in &self.projects {