notifications.rs

  1use super::*;
  2use rpc::Notification;
  3
  4impl Database {
  5    /// Initializes the different kinds of notifications by upserting records for them.
  6    pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
  7        notification_kind::Entity::insert_many(Notification::all_variant_names().iter().map(
  8            |kind| notification_kind::ActiveModel {
  9                name: ActiveValue::Set(kind.to_string()),
 10                ..Default::default()
 11            },
 12        ))
 13        .on_conflict(OnConflict::new().do_nothing().to_owned())
 14        .exec_without_returning(&self.pool)
 15        .await?;
 16
 17        let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
 18        while let Some(row) = rows.next().await {
 19            let row = row?;
 20            self.notification_kinds_by_name.insert(row.name, row.id);
 21        }
 22
 23        for name in Notification::all_variant_names() {
 24            if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
 25                self.notification_kinds_by_id.insert(id, name);
 26            }
 27        }
 28
 29        Ok(())
 30    }
 31
 32    /// Returns the notifications for the given recipient.
 33    pub async fn get_notifications(
 34        &self,
 35        recipient_id: UserId,
 36        limit: usize,
 37        before_id: Option<NotificationId>,
 38    ) -> Result<Vec<proto::Notification>> {
 39        self.transaction(|tx| async move {
 40            let mut result = Vec::new();
 41            let mut condition =
 42                Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
 43
 44            if let Some(before_id) = before_id {
 45                condition = condition.add(notification::Column::Id.lt(before_id));
 46            }
 47
 48            let mut rows = notification::Entity::find()
 49                .filter(condition)
 50                .order_by_desc(notification::Column::Id)
 51                .limit(limit as u64)
 52                .stream(&*tx)
 53                .await?;
 54            while let Some(row) = rows.next().await {
 55                let row = row?;
 56                let kind = row.kind;
 57                if let Some(proto) = model_to_proto(self, row) {
 58                    result.push(proto);
 59                } else {
 60                    log::warn!("unknown notification kind {:?}", kind);
 61                }
 62            }
 63            result.reverse();
 64            Ok(result)
 65        })
 66        .await
 67    }
 68
 69    /// Creates a notification. If `avoid_duplicates` is set to true, then avoid
 70    /// creating a new notification if the given recipient already has an
 71    /// unread notification with the given kind and entity id.
 72    pub async fn create_notification(
 73        &self,
 74        recipient_id: UserId,
 75        notification: Notification,
 76        avoid_duplicates: bool,
 77        tx: &DatabaseTransaction,
 78    ) -> Result<Option<(UserId, proto::Notification)>> {
 79        if avoid_duplicates {
 80            if self
 81                .find_notification(recipient_id, &notification, tx)
 82                .await?
 83                .is_some()
 84            {
 85                return Ok(None);
 86            }
 87        }
 88
 89        let proto = notification.to_proto();
 90        let kind = notification_kind_from_proto(self, &proto)?;
 91        let model = notification::ActiveModel {
 92            recipient_id: ActiveValue::Set(recipient_id),
 93            kind: ActiveValue::Set(kind),
 94            entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
 95            content: ActiveValue::Set(proto.content.clone()),
 96            ..Default::default()
 97        }
 98        .save(tx)
 99        .await?;
100
101        Ok(Some((
102            recipient_id,
103            proto::Notification {
104                id: model.id.as_ref().to_proto(),
105                kind: proto.kind,
106                timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
107                is_read: false,
108                response: None,
109                content: proto.content,
110                entity_id: proto.entity_id,
111            },
112        )))
113    }
114
115    /// Remove an unread notification with the given recipient, kind and
116    /// entity id.
117    pub async fn remove_notification(
118        &self,
119        recipient_id: UserId,
120        notification: Notification,
121        tx: &DatabaseTransaction,
122    ) -> Result<Option<NotificationId>> {
123        let id = self
124            .find_notification(recipient_id, &notification, tx)
125            .await?;
126        if let Some(id) = id {
127            notification::Entity::delete_by_id(id).exec(tx).await?;
128        }
129        Ok(id)
130    }
131
132    /// Populate the response for the notification with the given kind and
133    /// entity id.
134    pub async fn mark_notification_as_read_with_response(
135        &self,
136        recipient_id: UserId,
137        notification: &Notification,
138        response: bool,
139        tx: &DatabaseTransaction,
140    ) -> Result<Option<(UserId, proto::Notification)>> {
141        self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
142            .await
143    }
144
145    /// Marks the given notification as read.
146    pub async fn mark_notification_as_read(
147        &self,
148        recipient_id: UserId,
149        notification: &Notification,
150        tx: &DatabaseTransaction,
151    ) -> Result<Option<(UserId, proto::Notification)>> {
152        self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
153            .await
154    }
155
156    /// Marks the notification with the given ID as read.
157    pub async fn mark_notification_as_read_by_id(
158        &self,
159        recipient_id: UserId,
160        notification_id: NotificationId,
161    ) -> Result<NotificationBatch> {
162        self.transaction(|tx| async move {
163            let row = notification::Entity::update(notification::ActiveModel {
164                id: ActiveValue::Unchanged(notification_id),
165                recipient_id: ActiveValue::Unchanged(recipient_id),
166                is_read: ActiveValue::Set(true),
167                ..Default::default()
168            })
169            .exec(&*tx)
170            .await?;
171            Ok(model_to_proto(self, row)
172                .map(|notification| (recipient_id, notification))
173                .into_iter()
174                .collect())
175        })
176        .await
177    }
178
179    async fn mark_notification_as_read_internal(
180        &self,
181        recipient_id: UserId,
182        notification: &Notification,
183        response: Option<bool>,
184        tx: &DatabaseTransaction,
185    ) -> Result<Option<(UserId, proto::Notification)>> {
186        if let Some(id) = self
187            .find_notification(recipient_id, notification, tx)
188            .await?
189        {
190            let row = notification::Entity::update(notification::ActiveModel {
191                id: ActiveValue::Unchanged(id),
192                recipient_id: ActiveValue::Unchanged(recipient_id),
193                is_read: ActiveValue::Set(true),
194                response: if let Some(response) = response {
195                    ActiveValue::Set(Some(response))
196                } else {
197                    ActiveValue::NotSet
198                },
199                ..Default::default()
200            })
201            .exec(tx)
202            .await?;
203            Ok(model_to_proto(self, row).map(|notification| (recipient_id, notification)))
204        } else {
205            Ok(None)
206        }
207    }
208
209    /// Find an unread notification by its recipient, kind and entity id.
210    async fn find_notification(
211        &self,
212        recipient_id: UserId,
213        notification: &Notification,
214        tx: &DatabaseTransaction,
215    ) -> Result<Option<NotificationId>> {
216        let proto = notification.to_proto();
217        let kind = notification_kind_from_proto(self, &proto)?;
218
219        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
220        enum QueryIds {
221            Id,
222        }
223
224        Ok(notification::Entity::find()
225            .select_only()
226            .column(notification::Column::Id)
227            .filter(
228                Condition::all()
229                    .add(notification::Column::RecipientId.eq(recipient_id))
230                    .add(notification::Column::IsRead.eq(false))
231                    .add(notification::Column::Kind.eq(kind))
232                    .add(if proto.entity_id.is_some() {
233                        notification::Column::EntityId.eq(proto.entity_id)
234                    } else {
235                        notification::Column::EntityId.is_null()
236                    }),
237            )
238            .into_values::<_, QueryIds>()
239            .one(tx)
240            .await?)
241    }
242}
243
244fn model_to_proto(this: &Database, row: notification::Model) -> Option<proto::Notification> {
245    let kind = this.notification_kinds_by_id.get(&row.kind)?;
246    Some(proto::Notification {
247        id: row.id.to_proto(),
248        kind: kind.to_string(),
249        timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
250        is_read: row.is_read,
251        response: row.response,
252        content: row.content,
253        entity_id: row.entity_id.map(|id| id as u64),
254    })
255}
256
257fn notification_kind_from_proto(
258    this: &Database,
259    proto: &proto::Notification,
260) -> Result<NotificationKindId> {
261    Ok(this
262        .notification_kinds_by_name
263        .get(&proto.kind)
264        .copied()
265        .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
266}