@@ -1238,36 +1238,41 @@ impl Database {
&self,
expected_room_id: Option<RoomId>,
user_id: UserId,
- ) -> Result<RoomGuard<proto::Room>> {
- self.room_transaction(|tx| async move {
+ ) -> Result<Option<RoomGuard<proto::Room>>> {
+ self.optional_room_transaction(|tx| async move {
+ let mut filter = Condition::all()
+ .add(room_participant::Column::UserId.eq(user_id))
+ .add(room_participant::Column::AnsweringConnectionId.is_null());
+ if let Some(room_id) = expected_room_id {
+ filter = filter.add(room_participant::Column::RoomId.eq(room_id));
+ }
let participant = room_participant::Entity::find()
- .filter(
- room_participant::Column::UserId
- .eq(user_id)
- .and(room_participant::Column::AnsweringConnectionId.is_null()),
- )
+ .filter(filter)
.one(&*tx)
- .await?
- .ok_or_else(|| anyhow!("could not decline call"))?;
- let room_id = participant.room_id;
+ .await?;
- if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
- return Err(anyhow!("declining call on unexpected room"))?;
- }
+ let participant = if let Some(participant) = participant {
+ participant
+ } else if expected_room_id.is_some() {
+ return Err(anyhow!("could not find call to decline"))?;
+ } else {
+ return Ok(None);
+ };
+ let room_id = participant.room_id;
room_participant::Entity::delete(participant.into_active_model())
.exec(&*tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ Ok(Some((room_id, room)))
})
.await
}
pub async fn cancel_call(
&self,
- expected_room_id: Option<RoomId>,
+ room_id: RoomId,
calling_connection: ConnectionId,
called_user_id: UserId,
) -> Result<RoomGuard<proto::Room>> {
@@ -1276,6 +1281,7 @@ impl Database {
.filter(
Condition::all()
.add(room_participant::Column::UserId.eq(called_user_id))
+ .add(room_participant::Column::RoomId.eq(room_id))
.add(
room_participant::Column::CallingConnectionId
.eq(calling_connection.id as i32),
@@ -1288,11 +1294,8 @@ impl Database {
)
.one(&*tx)
.await?
- .ok_or_else(|| anyhow!("could not cancel call"))?;
+ .ok_or_else(|| anyhow!("no call to cancel"))?;
let room_id = participant.room_id;
- if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
- return Err(anyhow!("canceling call on unexpected room"))?;
- }
room_participant::Entity::delete(participant.into_active_model())
.exec(&*tx)
@@ -1346,8 +1349,11 @@ impl Database {
.await
}
- pub async fn leave_room(&self, connection: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
- self.room_transaction(|tx| async move {
+ pub async fn leave_room(
+ &self,
+ connection: ConnectionId,
+ ) -> Result<Option<RoomGuard<LeftRoom>>> {
+ self.optional_room_transaction(|tx| async move {
let leaving_participant = room_participant::Entity::find()
.filter(
Condition::all()
@@ -1498,9 +1504,9 @@ impl Database {
self.rooms.remove(&room_id);
}
- Ok((room_id, left_room))
+ Ok(Some((room_id, left_room)))
} else {
- Err(anyhow!("could not leave room"))?
+ Ok(None)
}
})
.await
@@ -2549,26 +2555,38 @@ impl Database {
self.run(body).await
}
- async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
+ async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
where
F: Send + Fn(TransactionHandle) -> Fut,
- Fut: Send + Future<Output = Result<(RoomId, T)>>,
+ Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
{
let body = async {
loop {
let (tx, result) = self.with_transaction(&f).await?;
match result {
- Ok((room_id, data)) => {
+ Ok(Some((room_id, data))) => {
let lock = self.rooms.entry(room_id).or_default().clone();
let _guard = lock.lock_owned().await;
match tx.commit().await.map_err(Into::into) {
Ok(()) => {
- return Ok(RoomGuard {
+ return Ok(Some(RoomGuard {
data,
_guard,
_not_send: PhantomData,
- });
+ }));
+ }
+ Err(error) => {
+ if is_serialization_error(&error) {
+ // Retry (don't break the loop)
+ } else {
+ return Err(error);
+ }
}
+ }
+ }
+ Ok(None) => {
+ match tx.commit().await.map_err(Into::into) {
+ Ok(()) => return Ok(None),
Err(error) => {
if is_serialization_error(&error) {
// Retry (don't break the loop)
@@ -2593,6 +2611,23 @@ impl Database {
self.run(body).await
}
+ async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
+ where
+ F: Send + Fn(TransactionHandle) -> Fut,
+ Fut: Send + Future<Output = Result<(RoomId, T)>>,
+ {
+ let data = self
+ .optional_room_transaction(move |tx| {
+ let future = f(tx);
+ async {
+ let data = future.await?;
+ Ok(Some(data))
+ }
+ })
+ .await?;
+ Ok(data.unwrap())
+ }
+
async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
where
F: Send + Fn(TransactionHandle) -> Fut,
@@ -820,7 +820,7 @@ async fn sign_out(
.is_user_online(session.user_id)
{
let db = session.db().await;
- if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
+ if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
room_updated(&room, &session.peer);
}
}
@@ -1024,7 +1024,7 @@ async fn cancel_call(
let room = session
.db()
.await
- .cancel_call(Some(room_id), session.connection_id, called_user_id)
+ .cancel_call(room_id, session.connection_id, called_user_id)
.await?;
room_updated(&room, &session.peer);
}
@@ -1057,7 +1057,8 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
.db()
.await
.decline_call(Some(room_id), session.user_id)
- .await?;
+ .await?
+ .ok_or_else(|| anyhow!("failed to decline call"))?;
room_updated(&room, &session.peer);
}
@@ -2026,8 +2027,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
let canceled_calls_to_user_ids;
let live_kit_room;
let delete_live_kit_room;
- {
- let mut left_room = session.db().await.leave_room(session.connection_id).await?;
+ if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
contacts_to_update.insert(session.user_id);
for project in left_room.left_projects.values() {
@@ -2039,6 +2039,8 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
live_kit_room = mem::take(&mut left_room.room.live_kit_room);
delete_live_kit_room = left_room.room.participants.is_empty();
+ } else {
+ return Ok(());
}
{