1use super::*;
2use rpc::Notification;
3use util::ResultExt;
4
5impl Database {
6 /// Initializes the different kinds of notifications by upserting records for them.
7 pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
8 let all_kinds = Notification::all_variant_names();
9 let existing_kinds = notification_kind::Entity::find().all(&self.pool).await?;
10
11 let kinds_to_create: Vec<_> = all_kinds
12 .iter()
13 .filter(|&kind| {
14 !existing_kinds
15 .iter()
16 .any(|existing| existing.name == **kind)
17 })
18 .map(|kind| notification_kind::ActiveModel {
19 name: ActiveValue::Set(kind.to_string()),
20 ..Default::default()
21 })
22 .collect();
23
24 if !kinds_to_create.is_empty() {
25 notification_kind::Entity::insert_many(kinds_to_create)
26 .exec_without_returning(&self.pool)
27 .await?;
28 }
29
30 let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
31 while let Some(row) = rows.next().await {
32 let row = row?;
33 self.notification_kinds_by_name.insert(row.name, row.id);
34 }
35
36 for name in Notification::all_variant_names() {
37 if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
38 self.notification_kinds_by_id.insert(id, name);
39 }
40 }
41
42 Ok(())
43 }
44
45 /// Returns the notifications for the given recipient.
46 pub async fn get_notifications(
47 &self,
48 recipient_id: UserId,
49 limit: usize,
50 before_id: Option<NotificationId>,
51 ) -> Result<Vec<proto::Notification>> {
52 self.transaction(|tx| async move {
53 let mut result = Vec::new();
54 let mut condition =
55 Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
56
57 if let Some(before_id) = before_id {
58 condition = condition.add(notification::Column::Id.lt(before_id));
59 }
60
61 let mut rows = notification::Entity::find()
62 .filter(condition)
63 .order_by_desc(notification::Column::Id)
64 .limit(limit as u64)
65 .stream(&*tx)
66 .await?;
67 while let Some(row) = rows.next().await {
68 let row = row?;
69 if let Some(proto) = model_to_proto(self, row).log_err() {
70 result.push(proto);
71 }
72 }
73 result.reverse();
74 Ok(result)
75 })
76 .await
77 }
78
79 /// Creates a notification. If `avoid_duplicates` is set to true, then avoid
80 /// creating a new notification if the given recipient already has an
81 /// unread notification with the given kind and entity id.
82 pub async fn create_notification(
83 &self,
84 recipient_id: UserId,
85 notification: Notification,
86 avoid_duplicates: bool,
87 tx: &DatabaseTransaction,
88 ) -> Result<Option<(UserId, proto::Notification)>> {
89 if avoid_duplicates
90 && self
91 .find_notification(recipient_id, ¬ification, tx)
92 .await?
93 .is_some()
94 {
95 return Ok(None);
96 }
97
98 let proto = notification.to_proto();
99 let kind = notification_kind_from_proto(self, &proto)?;
100 let model = notification::ActiveModel {
101 recipient_id: ActiveValue::Set(recipient_id),
102 kind: ActiveValue::Set(kind),
103 entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
104 content: ActiveValue::Set(proto.content.clone()),
105 ..Default::default()
106 }
107 .save(tx)
108 .await?;
109
110 Ok(Some((
111 recipient_id,
112 proto::Notification {
113 id: model.id.as_ref().to_proto(),
114 kind: proto.kind,
115 timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
116 is_read: false,
117 response: None,
118 content: proto.content,
119 entity_id: proto.entity_id,
120 },
121 )))
122 }
123
124 /// Remove an unread notification with the given recipient, kind and
125 /// entity id.
126 pub async fn remove_notification(
127 &self,
128 recipient_id: UserId,
129 notification: Notification,
130 tx: &DatabaseTransaction,
131 ) -> Result<Option<NotificationId>> {
132 let id = self
133 .find_notification(recipient_id, ¬ification, tx)
134 .await?;
135 if let Some(id) = id {
136 notification::Entity::delete_by_id(id).exec(tx).await?;
137 }
138 Ok(id)
139 }
140
141 /// Populate the response for the notification with the given kind and
142 /// entity id.
143 pub async fn mark_notification_as_read_with_response(
144 &self,
145 recipient_id: UserId,
146 notification: &Notification,
147 response: bool,
148 tx: &DatabaseTransaction,
149 ) -> Result<Option<(UserId, proto::Notification)>> {
150 self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
151 .await
152 }
153
154 /// Marks the given notification as read.
155 pub async fn mark_notification_as_read(
156 &self,
157 recipient_id: UserId,
158 notification: &Notification,
159 tx: &DatabaseTransaction,
160 ) -> Result<Option<(UserId, proto::Notification)>> {
161 self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
162 .await
163 }
164
165 /// Marks the notification with the given ID as read.
166 pub async fn mark_notification_as_read_by_id(
167 &self,
168 recipient_id: UserId,
169 notification_id: NotificationId,
170 ) -> Result<NotificationBatch> {
171 self.transaction(|tx| async move {
172 let row = notification::Entity::update(notification::ActiveModel {
173 id: ActiveValue::Unchanged(notification_id),
174 recipient_id: ActiveValue::Unchanged(recipient_id),
175 is_read: ActiveValue::Set(true),
176 ..Default::default()
177 })
178 .exec(&*tx)
179 .await?;
180 Ok(model_to_proto(self, row)
181 .map(|notification| (recipient_id, notification))
182 .into_iter()
183 .collect())
184 })
185 .await
186 }
187
188 async fn mark_notification_as_read_internal(
189 &self,
190 recipient_id: UserId,
191 notification: &Notification,
192 response: Option<bool>,
193 tx: &DatabaseTransaction,
194 ) -> Result<Option<(UserId, proto::Notification)>> {
195 if let Some(id) = self
196 .find_notification(recipient_id, notification, tx)
197 .await?
198 {
199 let row = notification::Entity::update(notification::ActiveModel {
200 id: ActiveValue::Unchanged(id),
201 recipient_id: ActiveValue::Unchanged(recipient_id),
202 is_read: ActiveValue::Set(true),
203 response: if let Some(response) = response {
204 ActiveValue::Set(Some(response))
205 } else {
206 ActiveValue::NotSet
207 },
208 ..Default::default()
209 })
210 .exec(tx)
211 .await?;
212 Ok(model_to_proto(self, row)
213 .map(|notification| (recipient_id, notification))
214 .ok())
215 } else {
216 Ok(None)
217 }
218 }
219
220 /// Find an unread notification by its recipient, kind and entity id.
221 async fn find_notification(
222 &self,
223 recipient_id: UserId,
224 notification: &Notification,
225 tx: &DatabaseTransaction,
226 ) -> Result<Option<NotificationId>> {
227 let proto = notification.to_proto();
228 let kind = notification_kind_from_proto(self, &proto)?;
229
230 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
231 enum QueryIds {
232 Id,
233 }
234
235 Ok(notification::Entity::find()
236 .select_only()
237 .column(notification::Column::Id)
238 .filter(
239 Condition::all()
240 .add(notification::Column::RecipientId.eq(recipient_id))
241 .add(notification::Column::IsRead.eq(false))
242 .add(notification::Column::Kind.eq(kind))
243 .add(if proto.entity_id.is_some() {
244 notification::Column::EntityId.eq(proto.entity_id)
245 } else {
246 notification::Column::EntityId.is_null()
247 }),
248 )
249 .into_values::<_, QueryIds>()
250 .one(tx)
251 .await?)
252 }
253}
254
255pub fn model_to_proto(this: &Database, row: notification::Model) -> Result<proto::Notification> {
256 let kind = this
257 .notification_kinds_by_id
258 .get(&row.kind)
259 .ok_or_else(|| anyhow!("Unknown notification kind"))?;
260 Ok(proto::Notification {
261 id: row.id.to_proto(),
262 kind: kind.to_string(),
263 timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
264 is_read: row.is_read,
265 response: row.response,
266 content: row.content,
267 entity_id: row.entity_id.map(|id| id as u64),
268 })
269}
270
271fn notification_kind_from_proto(
272 this: &Database,
273 proto: &proto::Notification,
274) -> Result<NotificationKindId> {
275 Ok(this
276 .notification_kinds_by_name
277 .get(&proto.kind)
278 .copied()
279 .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
280}