Reduce duplication in notification queries

Max Brunsfeld created

Change summary

crates/collab/src/db/queries/notifications.rs | 89 +++++++++++---------
1 file changed, 48 insertions(+), 41 deletions(-)

Detailed changes

crates/collab/src/db/queries/notifications.rs 🔗

@@ -71,6 +71,16 @@ impl Database {
         avoid_duplicates: bool,
         tx: &DatabaseTransaction,
     ) -> Result<Option<proto::Notification>> {
+        if avoid_duplicates {
+            if self
+                .find_notification(recipient_id, &notification, tx)
+                .await?
+                .is_some()
+            {
+                return Ok(None);
+            }
+        }
+
         let notification_proto = notification.to_proto();
         let kind = *self
             .notification_kinds_by_name
@@ -78,33 +88,6 @@ impl Database {
             .ok_or_else(|| anyhow!("invalid notification kind {:?}", notification_proto.kind))?;
         let actor_id = notification_proto.actor_id.map(|id| UserId::from_proto(id));
 
-        if avoid_duplicates {
-            let mut existing_notifications = notification::Entity::find()
-                .filter(
-                    Condition::all()
-                        .add(notification::Column::RecipientId.eq(recipient_id))
-                        .add(notification::Column::IsRead.eq(false))
-                        .add(notification::Column::Kind.eq(kind))
-                        .add(notification::Column::ActorId.eq(actor_id)),
-                )
-                .stream(&*tx)
-                .await?;
-
-            // Check if this notification already exists. Don't rely on the
-            // JSON serialization being identical, in case the notification enum
-            // is changed in backward-compatible ways over time.
-            while let Some(row) = existing_notifications.next().await {
-                let row = row?;
-                if let Some(proto) = self.model_to_proto(row) {
-                    if let Some(existing) = Notification::from_proto(&proto) {
-                        if existing == notification {
-                            return Ok(None);
-                        }
-                    }
-                }
-            }
-        }
-
         let model = notification::ActiveModel {
             recipient_id: ActiveValue::Set(recipient_id),
             kind: ActiveValue::Set(kind),
@@ -119,7 +102,7 @@ impl Database {
 
         Ok(Some(proto::Notification {
             id: model.id.as_ref().to_proto(),
-            kind: notification_proto.kind.to_string(),
+            kind: notification_proto.kind,
             timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
             is_read: false,
             content: notification_proto.content,
@@ -133,28 +116,52 @@ impl Database {
         notification: Notification,
         tx: &DatabaseTransaction,
     ) -> Result<Option<NotificationId>> {
-        let notification = notification.to_proto();
+        let id = self
+            .find_notification(recipient_id, &notification, tx)
+            .await?;
+        if let Some(id) = id {
+            notification::Entity::delete_by_id(id).exec(tx).await?;
+        }
+        Ok(id)
+    }
+
+    pub async fn find_notification(
+        &self,
+        recipient_id: UserId,
+        notification: &Notification,
+        tx: &DatabaseTransaction,
+    ) -> Result<Option<NotificationId>> {
+        let proto = notification.to_proto();
         let kind = *self
             .notification_kinds_by_name
-            .get(&notification.kind)
-            .ok_or_else(|| anyhow!("invalid notification kind {:?}", notification.kind))?;
-        let actor_id = notification.actor_id.map(|id| UserId::from_proto(id));
-        let notification = notification::Entity::find()
+            .get(&proto.kind)
+            .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?;
+        let mut rows = notification::Entity::find()
             .filter(
                 Condition::all()
                     .add(notification::Column::RecipientId.eq(recipient_id))
+                    .add(notification::Column::IsRead.eq(false))
                     .add(notification::Column::Kind.eq(kind))
-                    .add(notification::Column::ActorId.eq(actor_id))
-                    .add(notification::Column::Content.eq(notification.content)),
+                    .add(notification::Column::ActorId.eq(proto.actor_id)),
             )
-            .one(tx)
+            .stream(&*tx)
             .await?;
-        if let Some(notification) = &notification {
-            notification::Entity::delete_by_id(notification.id)
-                .exec(tx)
-                .await?;
+
+        // Don't rely on the JSON serialization being identical, in case the
+        // notification type is changed in backward-compatible ways.
+        while let Some(row) = rows.next().await {
+            let row = row?;
+            let id = row.id;
+            if let Some(proto) = self.model_to_proto(row) {
+                if let Some(existing) = Notification::from_proto(&proto) {
+                    if existing == *notification {
+                        return Ok(Some(id));
+                    }
+                }
+            }
         }
-        Ok(notification.map(|notification| notification.id))
+
+        Ok(None)
     }
 
     fn model_to_proto(&self, row: notification::Model) -> Option<proto::Notification> {