notifications.rs

  1use super::*;
  2use rpc::Notification;
  3use util::ResultExt;
  4
  5impl Database {
  6    /// Initializes the different kinds of notifications by upserting records for them.
  7    pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
  8        let all_kinds = Notification::all_variant_names();
  9        let existing_kinds = notification_kind::Entity::find().all(&self.pool).await?;
 10
 11        let kinds_to_create: Vec<_> = all_kinds
 12            .iter()
 13            .filter(|&kind| {
 14                !existing_kinds
 15                    .iter()
 16                    .any(|existing| existing.name == **kind)
 17            })
 18            .map(|kind| notification_kind::ActiveModel {
 19                name: ActiveValue::Set(kind.to_string()),
 20                ..Default::default()
 21            })
 22            .collect();
 23
 24        if !kinds_to_create.is_empty() {
 25            notification_kind::Entity::insert_many(kinds_to_create)
 26                .exec_without_returning(&self.pool)
 27                .await?;
 28        }
 29
 30        let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
 31        while let Some(row) = rows.next().await {
 32            let row = row?;
 33            self.notification_kinds_by_name.insert(row.name, row.id);
 34        }
 35
 36        for name in Notification::all_variant_names() {
 37            if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
 38                self.notification_kinds_by_id.insert(id, name);
 39            }
 40        }
 41
 42        Ok(())
 43    }
 44
 45    /// Returns the notifications for the given recipient.
 46    pub async fn get_notifications(
 47        &self,
 48        recipient_id: UserId,
 49        limit: usize,
 50        before_id: Option<NotificationId>,
 51    ) -> Result<Vec<proto::Notification>> {
 52        self.transaction(|tx| async move {
 53            let mut result = Vec::new();
 54            let mut condition =
 55                Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
 56
 57            if let Some(before_id) = before_id {
 58                condition = condition.add(notification::Column::Id.lt(before_id));
 59            }
 60
 61            let mut rows = notification::Entity::find()
 62                .filter(condition)
 63                .order_by_desc(notification::Column::Id)
 64                .limit(limit as u64)
 65                .stream(&*tx)
 66                .await?;
 67            while let Some(row) = rows.next().await {
 68                let row = row?;
 69                if let Some(proto) = model_to_proto(self, row).log_err() {
 70                    result.push(proto);
 71                }
 72            }
 73            result.reverse();
 74            Ok(result)
 75        })
 76        .await
 77    }
 78
 79    /// Creates a notification. If `avoid_duplicates` is set to true, then avoid
 80    /// creating a new notification if the given recipient already has an
 81    /// unread notification with the given kind and entity id.
 82    pub async fn create_notification(
 83        &self,
 84        recipient_id: UserId,
 85        notification: Notification,
 86        avoid_duplicates: bool,
 87        tx: &DatabaseTransaction,
 88    ) -> Result<Option<(UserId, proto::Notification)>> {
 89        if avoid_duplicates
 90            && self
 91                .find_notification(recipient_id, &notification, tx)
 92                .await?
 93                .is_some()
 94        {
 95            return Ok(None);
 96        }
 97
 98        let proto = notification.to_proto();
 99        let kind = notification_kind_from_proto(self, &proto)?;
100        let model = notification::ActiveModel {
101            recipient_id: ActiveValue::Set(recipient_id),
102            kind: ActiveValue::Set(kind),
103            entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
104            content: ActiveValue::Set(proto.content.clone()),
105            ..Default::default()
106        }
107        .save(tx)
108        .await?;
109
110        Ok(Some((
111            recipient_id,
112            proto::Notification {
113                id: model.id.as_ref().to_proto(),
114                kind: proto.kind,
115                timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
116                is_read: false,
117                response: None,
118                content: proto.content,
119                entity_id: proto.entity_id,
120            },
121        )))
122    }
123
124    /// Remove an unread notification with the given recipient, kind and
125    /// entity id.
126    pub async fn remove_notification(
127        &self,
128        recipient_id: UserId,
129        notification: Notification,
130        tx: &DatabaseTransaction,
131    ) -> Result<Option<NotificationId>> {
132        let id = self
133            .find_notification(recipient_id, &notification, tx)
134            .await?;
135        if let Some(id) = id {
136            notification::Entity::delete_by_id(id).exec(tx).await?;
137        }
138        Ok(id)
139    }
140
141    /// Populate the response for the notification with the given kind and
142    /// entity id.
143    pub async fn mark_notification_as_read_with_response(
144        &self,
145        recipient_id: UserId,
146        notification: &Notification,
147        response: bool,
148        tx: &DatabaseTransaction,
149    ) -> Result<Option<(UserId, proto::Notification)>> {
150        self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
151            .await
152    }
153
154    /// Marks the given notification as read.
155    pub async fn mark_notification_as_read(
156        &self,
157        recipient_id: UserId,
158        notification: &Notification,
159        tx: &DatabaseTransaction,
160    ) -> Result<Option<(UserId, proto::Notification)>> {
161        self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
162            .await
163    }
164
165    /// Marks the notification with the given ID as read.
166    pub async fn mark_notification_as_read_by_id(
167        &self,
168        recipient_id: UserId,
169        notification_id: NotificationId,
170    ) -> Result<NotificationBatch> {
171        self.transaction(|tx| async move {
172            let row = notification::Entity::update(notification::ActiveModel {
173                id: ActiveValue::Unchanged(notification_id),
174                recipient_id: ActiveValue::Unchanged(recipient_id),
175                is_read: ActiveValue::Set(true),
176                ..Default::default()
177            })
178            .exec(&*tx)
179            .await?;
180            Ok(model_to_proto(self, row)
181                .map(|notification| (recipient_id, notification))
182                .into_iter()
183                .collect())
184        })
185        .await
186    }
187
188    async fn mark_notification_as_read_internal(
189        &self,
190        recipient_id: UserId,
191        notification: &Notification,
192        response: Option<bool>,
193        tx: &DatabaseTransaction,
194    ) -> Result<Option<(UserId, proto::Notification)>> {
195        if let Some(id) = self
196            .find_notification(recipient_id, notification, tx)
197            .await?
198        {
199            let row = notification::Entity::update(notification::ActiveModel {
200                id: ActiveValue::Unchanged(id),
201                recipient_id: ActiveValue::Unchanged(recipient_id),
202                is_read: ActiveValue::Set(true),
203                response: if let Some(response) = response {
204                    ActiveValue::Set(Some(response))
205                } else {
206                    ActiveValue::NotSet
207                },
208                ..Default::default()
209            })
210            .exec(tx)
211            .await?;
212            Ok(model_to_proto(self, row)
213                .map(|notification| (recipient_id, notification))
214                .ok())
215        } else {
216            Ok(None)
217        }
218    }
219
220    /// Find an unread notification by its recipient, kind and entity id.
221    async fn find_notification(
222        &self,
223        recipient_id: UserId,
224        notification: &Notification,
225        tx: &DatabaseTransaction,
226    ) -> Result<Option<NotificationId>> {
227        let proto = notification.to_proto();
228        let kind = notification_kind_from_proto(self, &proto)?;
229
230        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
231        enum QueryIds {
232            Id,
233        }
234
235        Ok(notification::Entity::find()
236            .select_only()
237            .column(notification::Column::Id)
238            .filter(
239                Condition::all()
240                    .add(notification::Column::RecipientId.eq(recipient_id))
241                    .add(notification::Column::IsRead.eq(false))
242                    .add(notification::Column::Kind.eq(kind))
243                    .add(if proto.entity_id.is_some() {
244                        notification::Column::EntityId.eq(proto.entity_id)
245                    } else {
246                        notification::Column::EntityId.is_null()
247                    }),
248            )
249            .into_values::<_, QueryIds>()
250            .one(tx)
251            .await?)
252    }
253}
254
255pub fn model_to_proto(this: &Database, row: notification::Model) -> Result<proto::Notification> {
256    let kind = this
257        .notification_kinds_by_id
258        .get(&row.kind)
259        .ok_or_else(|| anyhow!("Unknown notification kind"))?;
260    Ok(proto::Notification {
261        id: row.id.to_proto(),
262        kind: kind.to_string(),
263        timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
264        is_read: row.is_read,
265        response: row.response,
266        content: row.content,
267        entity_id: row.entity_id.map(|id| id as u64),
268    })
269}
270
271fn notification_kind_from_proto(
272    this: &Database,
273    proto: &proto::Notification,
274) -> Result<NotificationKindId> {
275    Ok(this
276        .notification_kinds_by_name
277        .get(&proto.kind)
278        .copied()
279        .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
280}