1use super::*;
2use rpc::Notification;
3
4impl Database {
5 /// Initializes the different kinds of notifications by upserting records for them.
6 pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
7 notification_kind::Entity::insert_many(Notification::all_variant_names().iter().map(
8 |kind| notification_kind::ActiveModel {
9 name: ActiveValue::Set(kind.to_string()),
10 ..Default::default()
11 },
12 ))
13 .on_conflict(OnConflict::new().do_nothing().to_owned())
14 .exec_without_returning(&self.pool)
15 .await?;
16
17 let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
18 while let Some(row) = rows.next().await {
19 let row = row?;
20 self.notification_kinds_by_name.insert(row.name, row.id);
21 }
22
23 for name in Notification::all_variant_names() {
24 if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
25 self.notification_kinds_by_id.insert(id, name);
26 }
27 }
28
29 Ok(())
30 }
31
32 /// Returns the notifications for the given recipient.
33 pub async fn get_notifications(
34 &self,
35 recipient_id: UserId,
36 limit: usize,
37 before_id: Option<NotificationId>,
38 ) -> Result<Vec<proto::Notification>> {
39 self.transaction(|tx| async move {
40 let mut result = Vec::new();
41 let mut condition =
42 Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
43
44 if let Some(before_id) = before_id {
45 condition = condition.add(notification::Column::Id.lt(before_id));
46 }
47
48 let mut rows = notification::Entity::find()
49 .filter(condition)
50 .order_by_desc(notification::Column::Id)
51 .limit(limit as u64)
52 .stream(&*tx)
53 .await?;
54 while let Some(row) = rows.next().await {
55 let row = row?;
56 let kind = row.kind;
57 if let Some(proto) = model_to_proto(self, row) {
58 result.push(proto);
59 } else {
60 log::warn!("unknown notification kind {:?}", kind);
61 }
62 }
63 result.reverse();
64 Ok(result)
65 })
66 .await
67 }
68
69 /// Creates a notification. If `avoid_duplicates` is set to true, then avoid
70 /// creating a new notification if the given recipient already has an
71 /// unread notification with the given kind and entity id.
72 pub async fn create_notification(
73 &self,
74 recipient_id: UserId,
75 notification: Notification,
76 avoid_duplicates: bool,
77 tx: &DatabaseTransaction,
78 ) -> Result<Option<(UserId, proto::Notification)>> {
79 if avoid_duplicates {
80 if self
81 .find_notification(recipient_id, ¬ification, tx)
82 .await?
83 .is_some()
84 {
85 return Ok(None);
86 }
87 }
88
89 let proto = notification.to_proto();
90 let kind = notification_kind_from_proto(self, &proto)?;
91 let model = notification::ActiveModel {
92 recipient_id: ActiveValue::Set(recipient_id),
93 kind: ActiveValue::Set(kind),
94 entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
95 content: ActiveValue::Set(proto.content.clone()),
96 ..Default::default()
97 }
98 .save(&*tx)
99 .await?;
100
101 Ok(Some((
102 recipient_id,
103 proto::Notification {
104 id: model.id.as_ref().to_proto(),
105 kind: proto.kind,
106 timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
107 is_read: false,
108 response: None,
109 content: proto.content,
110 entity_id: proto.entity_id,
111 },
112 )))
113 }
114
115 /// Remove an unread notification with the given recipient, kind and
116 /// entity id.
117 pub async fn remove_notification(
118 &self,
119 recipient_id: UserId,
120 notification: Notification,
121 tx: &DatabaseTransaction,
122 ) -> Result<Option<NotificationId>> {
123 let id = self
124 .find_notification(recipient_id, ¬ification, tx)
125 .await?;
126 if let Some(id) = id {
127 notification::Entity::delete_by_id(id).exec(tx).await?;
128 }
129 Ok(id)
130 }
131
132 /// Populate the response for the notification with the given kind and
133 /// entity id.
134 pub async fn mark_notification_as_read_with_response(
135 &self,
136 recipient_id: UserId,
137 notification: &Notification,
138 response: bool,
139 tx: &DatabaseTransaction,
140 ) -> Result<Option<(UserId, proto::Notification)>> {
141 self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
142 .await
143 }
144
145 /// Marks the given notification as read.
146 pub async fn mark_notification_as_read(
147 &self,
148 recipient_id: UserId,
149 notification: &Notification,
150 tx: &DatabaseTransaction,
151 ) -> Result<Option<(UserId, proto::Notification)>> {
152 self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
153 .await
154 }
155
156 /// Marks the notification with the given ID as read.
157 pub async fn mark_notification_as_read_by_id(
158 &self,
159 recipient_id: UserId,
160 notification_id: NotificationId,
161 ) -> Result<NotificationBatch> {
162 self.transaction(|tx| async move {
163 let row = notification::Entity::update(notification::ActiveModel {
164 id: ActiveValue::Unchanged(notification_id),
165 recipient_id: ActiveValue::Unchanged(recipient_id),
166 is_read: ActiveValue::Set(true),
167 ..Default::default()
168 })
169 .exec(&*tx)
170 .await?;
171 Ok(model_to_proto(self, row)
172 .map(|notification| (recipient_id, notification))
173 .into_iter()
174 .collect())
175 })
176 .await
177 }
178
179 async fn mark_notification_as_read_internal(
180 &self,
181 recipient_id: UserId,
182 notification: &Notification,
183 response: Option<bool>,
184 tx: &DatabaseTransaction,
185 ) -> Result<Option<(UserId, proto::Notification)>> {
186 if let Some(id) = self
187 .find_notification(recipient_id, notification, &*tx)
188 .await?
189 {
190 let row = notification::Entity::update(notification::ActiveModel {
191 id: ActiveValue::Unchanged(id),
192 recipient_id: ActiveValue::Unchanged(recipient_id),
193 is_read: ActiveValue::Set(true),
194 response: if let Some(response) = response {
195 ActiveValue::Set(Some(response))
196 } else {
197 ActiveValue::NotSet
198 },
199 ..Default::default()
200 })
201 .exec(tx)
202 .await?;
203 Ok(model_to_proto(self, row).map(|notification| (recipient_id, notification)))
204 } else {
205 Ok(None)
206 }
207 }
208
209 /// Find an unread notification by its recipient, kind and entity id.
210 async fn find_notification(
211 &self,
212 recipient_id: UserId,
213 notification: &Notification,
214 tx: &DatabaseTransaction,
215 ) -> Result<Option<NotificationId>> {
216 let proto = notification.to_proto();
217 let kind = notification_kind_from_proto(self, &proto)?;
218
219 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
220 enum QueryIds {
221 Id,
222 }
223
224 Ok(notification::Entity::find()
225 .select_only()
226 .column(notification::Column::Id)
227 .filter(
228 Condition::all()
229 .add(notification::Column::RecipientId.eq(recipient_id))
230 .add(notification::Column::IsRead.eq(false))
231 .add(notification::Column::Kind.eq(kind))
232 .add(if proto.entity_id.is_some() {
233 notification::Column::EntityId.eq(proto.entity_id)
234 } else {
235 notification::Column::EntityId.is_null()
236 }),
237 )
238 .into_values::<_, QueryIds>()
239 .one(&*tx)
240 .await?)
241 }
242}
243
244fn model_to_proto(this: &Database, row: notification::Model) -> Option<proto::Notification> {
245 let kind = this.notification_kinds_by_id.get(&row.kind)?;
246 Some(proto::Notification {
247 id: row.id.to_proto(),
248 kind: kind.to_string(),
249 timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
250 is_read: row.is_read,
251 response: row.response,
252 content: row.content,
253 entity_id: row.entity_id.map(|id| id as u64),
254 })
255}
256
257fn notification_kind_from_proto(
258 this: &Database,
259 proto: &proto::Notification,
260) -> Result<NotificationKindId> {
261 Ok(this
262 .notification_kinds_by_name
263 .get(&proto.kind)
264 .copied()
265 .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
266}