diff --git a/crates/collab/src/db/queries/notifications.rs b/crates/collab/src/db/queries/notifications.rs index b8b2a15421e92c3df2cf449919d5c30629bd0a13..50e961957cfd00b338390cdf96831af02f4be23a 100644 --- a/crates/collab/src/db/queries/notifications.rs +++ b/crates/collab/src/db/queries/notifications.rs @@ -71,6 +71,16 @@ impl Database { avoid_duplicates: bool, tx: &DatabaseTransaction, ) -> Result> { + if avoid_duplicates { + if self + .find_notification(recipient_id, ¬ification, tx) + .await? + .is_some() + { + return Ok(None); + } + } + let notification_proto = notification.to_proto(); let kind = *self .notification_kinds_by_name @@ -78,33 +88,6 @@ impl Database { .ok_or_else(|| anyhow!("invalid notification kind {:?}", notification_proto.kind))?; let actor_id = notification_proto.actor_id.map(|id| UserId::from_proto(id)); - if avoid_duplicates { - let mut existing_notifications = notification::Entity::find() - .filter( - Condition::all() - .add(notification::Column::RecipientId.eq(recipient_id)) - .add(notification::Column::IsRead.eq(false)) - .add(notification::Column::Kind.eq(kind)) - .add(notification::Column::ActorId.eq(actor_id)), - ) - .stream(&*tx) - .await?; - - // Check if this notification already exists. Don't rely on the - // JSON serialization being identical, in case the notification enum - // is changed in backward-compatible ways over time. - while let Some(row) = existing_notifications.next().await { - let row = row?; - if let Some(proto) = self.model_to_proto(row) { - if let Some(existing) = Notification::from_proto(&proto) { - if existing == notification { - return Ok(None); - } - } - } - } - } - let model = notification::ActiveModel { recipient_id: ActiveValue::Set(recipient_id), kind: ActiveValue::Set(kind), @@ -119,7 +102,7 @@ impl Database { Ok(Some(proto::Notification { id: model.id.as_ref().to_proto(), - kind: notification_proto.kind.to_string(), + kind: notification_proto.kind, timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64, is_read: false, content: notification_proto.content, @@ -133,28 +116,52 @@ impl Database { notification: Notification, tx: &DatabaseTransaction, ) -> Result> { - let notification = notification.to_proto(); + let id = self + .find_notification(recipient_id, ¬ification, tx) + .await?; + if let Some(id) = id { + notification::Entity::delete_by_id(id).exec(tx).await?; + } + Ok(id) + } + + pub async fn find_notification( + &self, + recipient_id: UserId, + notification: &Notification, + tx: &DatabaseTransaction, + ) -> Result> { + let proto = notification.to_proto(); let kind = *self .notification_kinds_by_name - .get(¬ification.kind) - .ok_or_else(|| anyhow!("invalid notification kind {:?}", notification.kind))?; - let actor_id = notification.actor_id.map(|id| UserId::from_proto(id)); - let notification = notification::Entity::find() + .get(&proto.kind) + .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?; + let mut rows = notification::Entity::find() .filter( Condition::all() .add(notification::Column::RecipientId.eq(recipient_id)) + .add(notification::Column::IsRead.eq(false)) .add(notification::Column::Kind.eq(kind)) - .add(notification::Column::ActorId.eq(actor_id)) - .add(notification::Column::Content.eq(notification.content)), + .add(notification::Column::ActorId.eq(proto.actor_id)), ) - .one(tx) + .stream(&*tx) .await?; - if let Some(notification) = ¬ification { - notification::Entity::delete_by_id(notification.id) - .exec(tx) - .await?; + + // Don't rely on the JSON serialization being identical, in case the + // notification type is changed in backward-compatible ways. + while let Some(row) = rows.next().await { + let row = row?; + let id = row.id; + if let Some(proto) = self.model_to_proto(row) { + if let Some(existing) = Notification::from_proto(&proto) { + if existing == *notification { + return Ok(Some(id)); + } + } + } } - Ok(notification.map(|notification| notification.id)) + + Ok(None) } fn model_to_proto(&self, row: notification::Model) -> Option {