1use super::*;
2use rpc::Notification;
3
4impl Database {
5 pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
6 notification_kind::Entity::insert_many(Notification::all_variant_names().iter().map(
7 |kind| notification_kind::ActiveModel {
8 name: ActiveValue::Set(kind.to_string()),
9 ..Default::default()
10 },
11 ))
12 .on_conflict(OnConflict::new().do_nothing().to_owned())
13 .exec_without_returning(&self.pool)
14 .await?;
15
16 let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
17 while let Some(row) = rows.next().await {
18 let row = row?;
19 self.notification_kinds_by_name.insert(row.name, row.id);
20 }
21
22 for name in Notification::all_variant_names() {
23 if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
24 self.notification_kinds_by_id.insert(id, name);
25 }
26 }
27
28 Ok(())
29 }
30
31 pub async fn get_notifications(
32 &self,
33 recipient_id: UserId,
34 limit: usize,
35 before_id: Option<NotificationId>,
36 ) -> Result<Vec<proto::Notification>> {
37 self.transaction(|tx| async move {
38 let mut result = Vec::new();
39 let mut condition =
40 Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
41
42 if let Some(before_id) = before_id {
43 condition = condition.add(notification::Column::Id.lt(before_id));
44 }
45
46 let mut rows = notification::Entity::find()
47 .filter(condition)
48 .order_by_desc(notification::Column::Id)
49 .limit(limit as u64)
50 .stream(&*tx)
51 .await?;
52 while let Some(row) = rows.next().await {
53 let row = row?;
54 let kind = row.kind;
55 if let Some(proto) = model_to_proto(self, row) {
56 result.push(proto);
57 } else {
58 log::warn!("unknown notification kind {:?}", kind);
59 }
60 }
61 result.reverse();
62 Ok(result)
63 })
64 .await
65 }
66
67 /// Create a notification. If `avoid_duplicates` is set to true, then avoid
68 /// creating a new notification if the given recipient already has an
69 /// unread notification with the given kind and entity id.
70 pub async fn create_notification(
71 &self,
72 recipient_id: UserId,
73 notification: Notification,
74 avoid_duplicates: bool,
75 tx: &DatabaseTransaction,
76 ) -> Result<Option<(UserId, proto::Notification)>> {
77 if avoid_duplicates {
78 if self
79 .find_notification(recipient_id, ¬ification, tx)
80 .await?
81 .is_some()
82 {
83 return Ok(None);
84 }
85 }
86
87 let proto = notification.to_proto();
88 let kind = notification_kind_from_proto(self, &proto)?;
89 let model = notification::ActiveModel {
90 recipient_id: ActiveValue::Set(recipient_id),
91 kind: ActiveValue::Set(kind),
92 entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
93 content: ActiveValue::Set(proto.content.clone()),
94 ..Default::default()
95 }
96 .save(&*tx)
97 .await?;
98
99 Ok(Some((
100 recipient_id,
101 proto::Notification {
102 id: model.id.as_ref().to_proto(),
103 kind: proto.kind,
104 timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
105 is_read: false,
106 response: None,
107 content: proto.content,
108 entity_id: proto.entity_id,
109 },
110 )))
111 }
112
113 /// Remove an unread notification with the given recipient, kind and
114 /// entity id.
115 pub async fn remove_notification(
116 &self,
117 recipient_id: UserId,
118 notification: Notification,
119 tx: &DatabaseTransaction,
120 ) -> Result<Option<NotificationId>> {
121 let id = self
122 .find_notification(recipient_id, ¬ification, tx)
123 .await?;
124 if let Some(id) = id {
125 notification::Entity::delete_by_id(id).exec(tx).await?;
126 }
127 Ok(id)
128 }
129
130 /// Populate the response for the notification with the given kind and
131 /// entity id.
132 pub async fn mark_notification_as_read_with_response(
133 &self,
134 recipient_id: UserId,
135 notification: &Notification,
136 response: bool,
137 tx: &DatabaseTransaction,
138 ) -> Result<Option<(UserId, proto::Notification)>> {
139 self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
140 .await
141 }
142
143 pub async fn mark_notification_as_read(
144 &self,
145 recipient_id: UserId,
146 notification: &Notification,
147 tx: &DatabaseTransaction,
148 ) -> Result<Option<(UserId, proto::Notification)>> {
149 self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
150 .await
151 }
152
153 pub async fn mark_notification_as_read_by_id(
154 &self,
155 recipient_id: UserId,
156 notification_id: NotificationId,
157 ) -> Result<NotificationBatch> {
158 self.transaction(|tx| async move {
159 let row = notification::Entity::update(notification::ActiveModel {
160 id: ActiveValue::Unchanged(notification_id),
161 recipient_id: ActiveValue::Unchanged(recipient_id),
162 is_read: ActiveValue::Set(true),
163 ..Default::default()
164 })
165 .exec(&*tx)
166 .await?;
167 Ok(model_to_proto(self, row)
168 .map(|notification| (recipient_id, notification))
169 .into_iter()
170 .collect())
171 })
172 .await
173 }
174
175 async fn mark_notification_as_read_internal(
176 &self,
177 recipient_id: UserId,
178 notification: &Notification,
179 response: Option<bool>,
180 tx: &DatabaseTransaction,
181 ) -> Result<Option<(UserId, proto::Notification)>> {
182 if let Some(id) = self
183 .find_notification(recipient_id, notification, &*tx)
184 .await?
185 {
186 let row = notification::Entity::update(notification::ActiveModel {
187 id: ActiveValue::Unchanged(id),
188 recipient_id: ActiveValue::Unchanged(recipient_id),
189 is_read: ActiveValue::Set(true),
190 response: if let Some(response) = response {
191 ActiveValue::Set(Some(response))
192 } else {
193 ActiveValue::NotSet
194 },
195 ..Default::default()
196 })
197 .exec(tx)
198 .await?;
199 Ok(model_to_proto(self, row).map(|notification| (recipient_id, notification)))
200 } else {
201 Ok(None)
202 }
203 }
204
205 /// Find an unread notification by its recipient, kind and entity id.
206 async fn find_notification(
207 &self,
208 recipient_id: UserId,
209 notification: &Notification,
210 tx: &DatabaseTransaction,
211 ) -> Result<Option<NotificationId>> {
212 let proto = notification.to_proto();
213 let kind = notification_kind_from_proto(self, &proto)?;
214
215 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
216 enum QueryIds {
217 Id,
218 }
219
220 Ok(notification::Entity::find()
221 .select_only()
222 .column(notification::Column::Id)
223 .filter(
224 Condition::all()
225 .add(notification::Column::RecipientId.eq(recipient_id))
226 .add(notification::Column::IsRead.eq(false))
227 .add(notification::Column::Kind.eq(kind))
228 .add(if proto.entity_id.is_some() {
229 notification::Column::EntityId.eq(proto.entity_id)
230 } else {
231 notification::Column::EntityId.is_null()
232 }),
233 )
234 .into_values::<_, QueryIds>()
235 .one(&*tx)
236 .await?)
237 }
238}
239
240fn model_to_proto(this: &Database, row: notification::Model) -> Option<proto::Notification> {
241 let kind = this.notification_kinds_by_id.get(&row.kind)?;
242 Some(proto::Notification {
243 id: row.id.to_proto(),
244 kind: kind.to_string(),
245 timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
246 is_read: row.is_read,
247 response: row.response,
248 content: row.content,
249 entity_id: row.entity_id.map(|id| id as u64),
250 })
251}
252
253fn notification_kind_from_proto(
254 this: &Database,
255 proto: &proto::Notification,
256) -> Result<NotificationKindId> {
257 Ok(this
258 .notification_kinds_by_name
259 .get(&proto.kind)
260 .copied()
261 .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
262}