notifications.rs

  1use super::*;
  2use rpc::Notification;
  3use util::ResultExt;
  4
  5impl Database {
  6    /// Initializes the different kinds of notifications by upserting records for them.
  7    pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
  8        notification_kind::Entity::insert_many(Notification::all_variant_names().iter().map(
  9            |kind| notification_kind::ActiveModel {
 10                name: ActiveValue::Set(kind.to_string()),
 11                ..Default::default()
 12            },
 13        ))
 14        .on_conflict(OnConflict::new().do_nothing().to_owned())
 15        .exec_without_returning(&self.pool)
 16        .await?;
 17
 18        let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
 19        while let Some(row) = rows.next().await {
 20            let row = row?;
 21            self.notification_kinds_by_name.insert(row.name, row.id);
 22        }
 23
 24        for name in Notification::all_variant_names() {
 25            if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
 26                self.notification_kinds_by_id.insert(id, name);
 27            }
 28        }
 29
 30        Ok(())
 31    }
 32
 33    /// Returns the notifications for the given recipient.
 34    pub async fn get_notifications(
 35        &self,
 36        recipient_id: UserId,
 37        limit: usize,
 38        before_id: Option<NotificationId>,
 39    ) -> Result<Vec<proto::Notification>> {
 40        self.transaction(|tx| async move {
 41            let mut result = Vec::new();
 42            let mut condition =
 43                Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
 44
 45            if let Some(before_id) = before_id {
 46                condition = condition.add(notification::Column::Id.lt(before_id));
 47            }
 48
 49            let mut rows = notification::Entity::find()
 50                .filter(condition)
 51                .order_by_desc(notification::Column::Id)
 52                .limit(limit as u64)
 53                .stream(&*tx)
 54                .await?;
 55            while let Some(row) = rows.next().await {
 56                let row = row?;
 57                if let Some(proto) = model_to_proto(self, row).log_err() {
 58                    result.push(proto);
 59                }
 60            }
 61            result.reverse();
 62            Ok(result)
 63        })
 64        .await
 65    }
 66
 67    /// Creates a notification. If `avoid_duplicates` is set to true, then avoid
 68    /// creating a new notification if the given recipient already has an
 69    /// unread notification with the given kind and entity id.
 70    pub async fn create_notification(
 71        &self,
 72        recipient_id: UserId,
 73        notification: Notification,
 74        avoid_duplicates: bool,
 75        tx: &DatabaseTransaction,
 76    ) -> Result<Option<(UserId, proto::Notification)>> {
 77        if avoid_duplicates {
 78            if self
 79                .find_notification(recipient_id, &notification, tx)
 80                .await?
 81                .is_some()
 82            {
 83                return Ok(None);
 84            }
 85        }
 86
 87        let proto = notification.to_proto();
 88        let kind = notification_kind_from_proto(self, &proto)?;
 89        let model = notification::ActiveModel {
 90            recipient_id: ActiveValue::Set(recipient_id),
 91            kind: ActiveValue::Set(kind),
 92            entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
 93            content: ActiveValue::Set(proto.content.clone()),
 94            ..Default::default()
 95        }
 96        .save(tx)
 97        .await?;
 98
 99        Ok(Some((
100            recipient_id,
101            proto::Notification {
102                id: model.id.as_ref().to_proto(),
103                kind: proto.kind,
104                timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
105                is_read: false,
106                response: None,
107                content: proto.content,
108                entity_id: proto.entity_id,
109            },
110        )))
111    }
112
113    /// Remove an unread notification with the given recipient, kind and
114    /// entity id.
115    pub async fn remove_notification(
116        &self,
117        recipient_id: UserId,
118        notification: Notification,
119        tx: &DatabaseTransaction,
120    ) -> Result<Option<NotificationId>> {
121        let id = self
122            .find_notification(recipient_id, &notification, tx)
123            .await?;
124        if let Some(id) = id {
125            notification::Entity::delete_by_id(id).exec(tx).await?;
126        }
127        Ok(id)
128    }
129
130    /// Populate the response for the notification with the given kind and
131    /// entity id.
132    pub async fn mark_notification_as_read_with_response(
133        &self,
134        recipient_id: UserId,
135        notification: &Notification,
136        response: bool,
137        tx: &DatabaseTransaction,
138    ) -> Result<Option<(UserId, proto::Notification)>> {
139        self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
140            .await
141    }
142
143    /// Marks the given notification as read.
144    pub async fn mark_notification_as_read(
145        &self,
146        recipient_id: UserId,
147        notification: &Notification,
148        tx: &DatabaseTransaction,
149    ) -> Result<Option<(UserId, proto::Notification)>> {
150        self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
151            .await
152    }
153
154    /// Marks the notification with the given ID as read.
155    pub async fn mark_notification_as_read_by_id(
156        &self,
157        recipient_id: UserId,
158        notification_id: NotificationId,
159    ) -> Result<NotificationBatch> {
160        self.transaction(|tx| async move {
161            let row = notification::Entity::update(notification::ActiveModel {
162                id: ActiveValue::Unchanged(notification_id),
163                recipient_id: ActiveValue::Unchanged(recipient_id),
164                is_read: ActiveValue::Set(true),
165                ..Default::default()
166            })
167            .exec(&*tx)
168            .await?;
169            Ok(model_to_proto(self, row)
170                .map(|notification| (recipient_id, notification))
171                .into_iter()
172                .collect())
173        })
174        .await
175    }
176
177    async fn mark_notification_as_read_internal(
178        &self,
179        recipient_id: UserId,
180        notification: &Notification,
181        response: Option<bool>,
182        tx: &DatabaseTransaction,
183    ) -> Result<Option<(UserId, proto::Notification)>> {
184        if let Some(id) = self
185            .find_notification(recipient_id, notification, tx)
186            .await?
187        {
188            let row = notification::Entity::update(notification::ActiveModel {
189                id: ActiveValue::Unchanged(id),
190                recipient_id: ActiveValue::Unchanged(recipient_id),
191                is_read: ActiveValue::Set(true),
192                response: if let Some(response) = response {
193                    ActiveValue::Set(Some(response))
194                } else {
195                    ActiveValue::NotSet
196                },
197                ..Default::default()
198            })
199            .exec(tx)
200            .await?;
201            Ok(model_to_proto(self, row)
202                .map(|notification| (recipient_id, notification))
203                .ok())
204        } else {
205            Ok(None)
206        }
207    }
208
209    /// Find an unread notification by its recipient, kind and entity id.
210    async fn find_notification(
211        &self,
212        recipient_id: UserId,
213        notification: &Notification,
214        tx: &DatabaseTransaction,
215    ) -> Result<Option<NotificationId>> {
216        let proto = notification.to_proto();
217        let kind = notification_kind_from_proto(self, &proto)?;
218
219        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
220        enum QueryIds {
221            Id,
222        }
223
224        Ok(notification::Entity::find()
225            .select_only()
226            .column(notification::Column::Id)
227            .filter(
228                Condition::all()
229                    .add(notification::Column::RecipientId.eq(recipient_id))
230                    .add(notification::Column::IsRead.eq(false))
231                    .add(notification::Column::Kind.eq(kind))
232                    .add(if proto.entity_id.is_some() {
233                        notification::Column::EntityId.eq(proto.entity_id)
234                    } else {
235                        notification::Column::EntityId.is_null()
236                    }),
237            )
238            .into_values::<_, QueryIds>()
239            .one(tx)
240            .await?)
241    }
242}
243
244pub fn model_to_proto(this: &Database, row: notification::Model) -> Result<proto::Notification> {
245    let kind = this
246        .notification_kinds_by_id
247        .get(&row.kind)
248        .ok_or_else(|| anyhow!("Unknown notification kind"))?;
249    Ok(proto::Notification {
250        id: row.id.to_proto(),
251        kind: kind.to_string(),
252        timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
253        is_read: row.is_read,
254        response: row.response,
255        content: row.content,
256        entity_id: row.entity_id.map(|id| id as u64),
257    })
258}
259
260fn notification_kind_from_proto(
261    this: &Database,
262    proto: &proto::Notification,
263) -> Result<NotificationKindId> {
264    Ok(this
265        .notification_kinds_by_name
266        .get(&proto.kind)
267        .copied()
268        .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
269}