1use super::*;
2use rpc::Notification;
3use util::ResultExt;
4
5impl Database {
6 /// Initializes the different kinds of notifications by upserting records for them.
7 pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
8 notification_kind::Entity::insert_many(Notification::all_variant_names().iter().map(
9 |kind| notification_kind::ActiveModel {
10 name: ActiveValue::Set(kind.to_string()),
11 ..Default::default()
12 },
13 ))
14 .on_conflict(OnConflict::new().do_nothing().to_owned())
15 .exec_without_returning(&self.pool)
16 .await?;
17
18 let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
19 while let Some(row) = rows.next().await {
20 let row = row?;
21 self.notification_kinds_by_name.insert(row.name, row.id);
22 }
23
24 for name in Notification::all_variant_names() {
25 if let Some(id) = self.notification_kinds_by_name.get(*name).copied() {
26 self.notification_kinds_by_id.insert(id, name);
27 }
28 }
29
30 Ok(())
31 }
32
33 /// Returns the notifications for the given recipient.
34 pub async fn get_notifications(
35 &self,
36 recipient_id: UserId,
37 limit: usize,
38 before_id: Option<NotificationId>,
39 ) -> Result<Vec<proto::Notification>> {
40 self.transaction(|tx| async move {
41 let mut result = Vec::new();
42 let mut condition =
43 Condition::all().add(notification::Column::RecipientId.eq(recipient_id));
44
45 if let Some(before_id) = before_id {
46 condition = condition.add(notification::Column::Id.lt(before_id));
47 }
48
49 let mut rows = notification::Entity::find()
50 .filter(condition)
51 .order_by_desc(notification::Column::Id)
52 .limit(limit as u64)
53 .stream(&*tx)
54 .await?;
55 while let Some(row) = rows.next().await {
56 let row = row?;
57 if let Some(proto) = model_to_proto(self, row).log_err() {
58 result.push(proto);
59 }
60 }
61 result.reverse();
62 Ok(result)
63 })
64 .await
65 }
66
67 /// Creates a notification. If `avoid_duplicates` is set to true, then avoid
68 /// creating a new notification if the given recipient already has an
69 /// unread notification with the given kind and entity id.
70 pub async fn create_notification(
71 &self,
72 recipient_id: UserId,
73 notification: Notification,
74 avoid_duplicates: bool,
75 tx: &DatabaseTransaction,
76 ) -> Result<Option<(UserId, proto::Notification)>> {
77 if avoid_duplicates {
78 if self
79 .find_notification(recipient_id, ¬ification, tx)
80 .await?
81 .is_some()
82 {
83 return Ok(None);
84 }
85 }
86
87 let proto = notification.to_proto();
88 let kind = notification_kind_from_proto(self, &proto)?;
89 let model = notification::ActiveModel {
90 recipient_id: ActiveValue::Set(recipient_id),
91 kind: ActiveValue::Set(kind),
92 entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
93 content: ActiveValue::Set(proto.content.clone()),
94 ..Default::default()
95 }
96 .save(tx)
97 .await?;
98
99 Ok(Some((
100 recipient_id,
101 proto::Notification {
102 id: model.id.as_ref().to_proto(),
103 kind: proto.kind,
104 timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
105 is_read: false,
106 response: None,
107 content: proto.content,
108 entity_id: proto.entity_id,
109 },
110 )))
111 }
112
113 /// Remove an unread notification with the given recipient, kind and
114 /// entity id.
115 pub async fn remove_notification(
116 &self,
117 recipient_id: UserId,
118 notification: Notification,
119 tx: &DatabaseTransaction,
120 ) -> Result<Option<NotificationId>> {
121 let id = self
122 .find_notification(recipient_id, ¬ification, tx)
123 .await?;
124 if let Some(id) = id {
125 notification::Entity::delete_by_id(id).exec(tx).await?;
126 }
127 Ok(id)
128 }
129
130 /// Populate the response for the notification with the given kind and
131 /// entity id.
132 pub async fn mark_notification_as_read_with_response(
133 &self,
134 recipient_id: UserId,
135 notification: &Notification,
136 response: bool,
137 tx: &DatabaseTransaction,
138 ) -> Result<Option<(UserId, proto::Notification)>> {
139 self.mark_notification_as_read_internal(recipient_id, notification, Some(response), tx)
140 .await
141 }
142
143 /// Marks the given notification as read.
144 pub async fn mark_notification_as_read(
145 &self,
146 recipient_id: UserId,
147 notification: &Notification,
148 tx: &DatabaseTransaction,
149 ) -> Result<Option<(UserId, proto::Notification)>> {
150 self.mark_notification_as_read_internal(recipient_id, notification, None, tx)
151 .await
152 }
153
154 /// Marks the notification with the given ID as read.
155 pub async fn mark_notification_as_read_by_id(
156 &self,
157 recipient_id: UserId,
158 notification_id: NotificationId,
159 ) -> Result<NotificationBatch> {
160 self.transaction(|tx| async move {
161 let row = notification::Entity::update(notification::ActiveModel {
162 id: ActiveValue::Unchanged(notification_id),
163 recipient_id: ActiveValue::Unchanged(recipient_id),
164 is_read: ActiveValue::Set(true),
165 ..Default::default()
166 })
167 .exec(&*tx)
168 .await?;
169 Ok(model_to_proto(self, row)
170 .map(|notification| (recipient_id, notification))
171 .into_iter()
172 .collect())
173 })
174 .await
175 }
176
177 async fn mark_notification_as_read_internal(
178 &self,
179 recipient_id: UserId,
180 notification: &Notification,
181 response: Option<bool>,
182 tx: &DatabaseTransaction,
183 ) -> Result<Option<(UserId, proto::Notification)>> {
184 if let Some(id) = self
185 .find_notification(recipient_id, notification, tx)
186 .await?
187 {
188 let row = notification::Entity::update(notification::ActiveModel {
189 id: ActiveValue::Unchanged(id),
190 recipient_id: ActiveValue::Unchanged(recipient_id),
191 is_read: ActiveValue::Set(true),
192 response: if let Some(response) = response {
193 ActiveValue::Set(Some(response))
194 } else {
195 ActiveValue::NotSet
196 },
197 ..Default::default()
198 })
199 .exec(tx)
200 .await?;
201 Ok(model_to_proto(self, row)
202 .map(|notification| (recipient_id, notification))
203 .ok())
204 } else {
205 Ok(None)
206 }
207 }
208
209 /// Find an unread notification by its recipient, kind and entity id.
210 async fn find_notification(
211 &self,
212 recipient_id: UserId,
213 notification: &Notification,
214 tx: &DatabaseTransaction,
215 ) -> Result<Option<NotificationId>> {
216 let proto = notification.to_proto();
217 let kind = notification_kind_from_proto(self, &proto)?;
218
219 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
220 enum QueryIds {
221 Id,
222 }
223
224 Ok(notification::Entity::find()
225 .select_only()
226 .column(notification::Column::Id)
227 .filter(
228 Condition::all()
229 .add(notification::Column::RecipientId.eq(recipient_id))
230 .add(notification::Column::IsRead.eq(false))
231 .add(notification::Column::Kind.eq(kind))
232 .add(if proto.entity_id.is_some() {
233 notification::Column::EntityId.eq(proto.entity_id)
234 } else {
235 notification::Column::EntityId.is_null()
236 }),
237 )
238 .into_values::<_, QueryIds>()
239 .one(tx)
240 .await?)
241 }
242}
243
244pub fn model_to_proto(this: &Database, row: notification::Model) -> Result<proto::Notification> {
245 let kind = this
246 .notification_kinds_by_id
247 .get(&row.kind)
248 .ok_or_else(|| anyhow!("Unknown notification kind"))?;
249 Ok(proto::Notification {
250 id: row.id.to_proto(),
251 kind: kind.to_string(),
252 timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
253 is_read: row.is_read,
254 response: row.response,
255 content: row.content,
256 entity_id: row.entity_id.map(|id| id as u64),
257 })
258}
259
260fn notification_kind_from_proto(
261 this: &Database,
262 proto: &proto::Notification,
263) -> Result<NotificationKindId> {
264 Ok(this
265 .notification_kinds_by_name
266 .get(&proto.kind)
267 .copied()
268 .ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
269}