1use super::*;
2
3impl Database {
4 /// Retrieves the contacts for the user with the given ID.
5 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
6 #[derive(Debug, FromQueryResult)]
7 struct ContactWithUserBusyStatuses {
8 user_id_a: UserId,
9 user_id_b: UserId,
10 a_to_b: bool,
11 accepted: bool,
12 user_a_busy: bool,
13 user_b_busy: bool,
14 }
15
16 self.transaction(|tx| async move {
17 let user_a_participant = Alias::new("user_a_participant");
18 let user_b_participant = Alias::new("user_b_participant");
19 let mut db_contacts = contact::Entity::find()
20 .column_as(
21 Expr::col((user_a_participant.clone(), room_participant::Column::Id))
22 .is_not_null(),
23 "user_a_busy",
24 )
25 .column_as(
26 Expr::col((user_b_participant.clone(), room_participant::Column::Id))
27 .is_not_null(),
28 "user_b_busy",
29 )
30 .filter(
31 contact::Column::UserIdA
32 .eq(user_id)
33 .or(contact::Column::UserIdB.eq(user_id)),
34 )
35 .join_as(
36 JoinType::LeftJoin,
37 contact::Relation::UserARoomParticipant.def(),
38 user_a_participant,
39 )
40 .join_as(
41 JoinType::LeftJoin,
42 contact::Relation::UserBRoomParticipant.def(),
43 user_b_participant,
44 )
45 .into_model::<ContactWithUserBusyStatuses>()
46 .stream(&*tx)
47 .await?;
48
49 let mut contacts = Vec::new();
50 while let Some(db_contact) = db_contacts.next().await {
51 let db_contact = db_contact?;
52 if db_contact.user_id_a == user_id {
53 if db_contact.accepted {
54 contacts.push(Contact::Accepted {
55 user_id: db_contact.user_id_b,
56 busy: db_contact.user_b_busy,
57 });
58 } else if db_contact.a_to_b {
59 contacts.push(Contact::Outgoing {
60 user_id: db_contact.user_id_b,
61 })
62 } else {
63 contacts.push(Contact::Incoming {
64 user_id: db_contact.user_id_b,
65 });
66 }
67 } else if db_contact.accepted {
68 contacts.push(Contact::Accepted {
69 user_id: db_contact.user_id_a,
70 busy: db_contact.user_a_busy,
71 });
72 } else if db_contact.a_to_b {
73 contacts.push(Contact::Incoming {
74 user_id: db_contact.user_id_a,
75 });
76 } else {
77 contacts.push(Contact::Outgoing {
78 user_id: db_contact.user_id_a,
79 });
80 }
81 }
82
83 contacts.sort_unstable_by_key(|contact| contact.user_id());
84
85 Ok(contacts)
86 })
87 .await
88 }
89
90 /// Returns whether the given user is a busy (on a call).
91 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
92 self.transaction(|tx| async move {
93 let participant = room_participant::Entity::find()
94 .filter(room_participant::Column::UserId.eq(user_id))
95 .one(&*tx)
96 .await?;
97 Ok(participant.is_some())
98 })
99 .await
100 }
101
102 /// Returns whether the user with `user_id_1` has the user with `user_id_2` as a contact.
103 ///
104 /// In order for this to return `true`, `user_id_2` must have an accepted invite from `user_id_1`.
105 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
106 self.transaction(|tx| async move {
107 let (id_a, id_b) = if user_id_1 < user_id_2 {
108 (user_id_1, user_id_2)
109 } else {
110 (user_id_2, user_id_1)
111 };
112
113 Ok(contact::Entity::find()
114 .filter(
115 contact::Column::UserIdA
116 .eq(id_a)
117 .and(contact::Column::UserIdB.eq(id_b))
118 .and(contact::Column::Accepted.eq(true)),
119 )
120 .one(&*tx)
121 .await?
122 .is_some())
123 })
124 .await
125 }
126
127 /// Invite the user with `receiver_id` to be a contact of the user with `sender_id`.
128 pub async fn send_contact_request(
129 &self,
130 sender_id: UserId,
131 receiver_id: UserId,
132 ) -> Result<NotificationBatch> {
133 self.transaction(|tx| async move {
134 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
135 (sender_id, receiver_id, true)
136 } else {
137 (receiver_id, sender_id, false)
138 };
139
140 let rows_affected = contact::Entity::insert(contact::ActiveModel {
141 user_id_a: ActiveValue::set(id_a),
142 user_id_b: ActiveValue::set(id_b),
143 a_to_b: ActiveValue::set(a_to_b),
144 accepted: ActiveValue::set(false),
145 should_notify: ActiveValue::set(true),
146 ..Default::default()
147 })
148 .on_conflict(
149 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
150 .values([
151 (contact::Column::Accepted, true.into()),
152 (contact::Column::ShouldNotify, false.into()),
153 ])
154 .action_and_where(
155 contact::Column::Accepted.eq(false).and(
156 contact::Column::AToB
157 .eq(a_to_b)
158 .and(contact::Column::UserIdA.eq(id_b))
159 .or(contact::Column::AToB
160 .ne(a_to_b)
161 .and(contact::Column::UserIdA.eq(id_a))),
162 ),
163 )
164 .to_owned(),
165 )
166 .exec_without_returning(&*tx)
167 .await?;
168
169 if rows_affected == 0 {
170 Err(anyhow!("contact already requested"))?;
171 }
172
173 Ok(self
174 .create_notification(
175 receiver_id,
176 rpc::Notification::ContactRequest {
177 sender_id: sender_id.to_proto(),
178 },
179 true,
180 &tx,
181 )
182 .await?
183 .into_iter()
184 .collect())
185 })
186 .await
187 }
188
189 /// Returns a bool indicating whether the removed contact had originally accepted or not
190 ///
191 /// Deletes the contact identified by the requester and responder ids, and then returns
192 /// whether the deleted contact had originally accepted or was a pending contact request.
193 ///
194 /// # Arguments
195 ///
196 /// * `requester_id` - The user that initiates this request
197 /// * `responder_id` - The user that will be removed
198 pub async fn remove_contact(
199 &self,
200 requester_id: UserId,
201 responder_id: UserId,
202 ) -> Result<(bool, Option<NotificationId>)> {
203 self.transaction(|tx| async move {
204 let (id_a, id_b) = if responder_id < requester_id {
205 (responder_id, requester_id)
206 } else {
207 (requester_id, responder_id)
208 };
209
210 let contact = contact::Entity::find()
211 .filter(
212 contact::Column::UserIdA
213 .eq(id_a)
214 .and(contact::Column::UserIdB.eq(id_b)),
215 )
216 .one(&*tx)
217 .await?
218 .ok_or_else(|| anyhow!("no such contact"))?;
219
220 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
221
222 let mut deleted_notification_id = None;
223 if !contact.accepted {
224 deleted_notification_id = self
225 .remove_notification(
226 responder_id,
227 rpc::Notification::ContactRequest {
228 sender_id: requester_id.to_proto(),
229 },
230 &tx,
231 )
232 .await?;
233 }
234
235 Ok((contact.accepted, deleted_notification_id))
236 })
237 .await
238 }
239
240 /// Dismisses a contact notification for the given user.
241 pub async fn dismiss_contact_notification(
242 &self,
243 user_id: UserId,
244 contact_user_id: UserId,
245 ) -> Result<()> {
246 self.transaction(|tx| async move {
247 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
248 (user_id, contact_user_id, true)
249 } else {
250 (contact_user_id, user_id, false)
251 };
252
253 let result = contact::Entity::update_many()
254 .set(contact::ActiveModel {
255 should_notify: ActiveValue::set(false),
256 ..Default::default()
257 })
258 .filter(
259 contact::Column::UserIdA
260 .eq(id_a)
261 .and(contact::Column::UserIdB.eq(id_b))
262 .and(
263 contact::Column::AToB
264 .eq(a_to_b)
265 .and(contact::Column::Accepted.eq(true))
266 .or(contact::Column::AToB
267 .ne(a_to_b)
268 .and(contact::Column::Accepted.eq(false))),
269 ),
270 )
271 .exec(&*tx)
272 .await?;
273 if result.rows_affected == 0 {
274 Err(anyhow!("no such contact request"))?
275 } else {
276 Ok(())
277 }
278 })
279 .await
280 }
281
282 /// Accept or decline a contact request
283 pub async fn respond_to_contact_request(
284 &self,
285 responder_id: UserId,
286 requester_id: UserId,
287 accept: bool,
288 ) -> Result<NotificationBatch> {
289 self.transaction(|tx| async move {
290 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
291 (responder_id, requester_id, false)
292 } else {
293 (requester_id, responder_id, true)
294 };
295 let rows_affected = if accept {
296 let result = contact::Entity::update_many()
297 .set(contact::ActiveModel {
298 accepted: ActiveValue::set(true),
299 should_notify: ActiveValue::set(true),
300 ..Default::default()
301 })
302 .filter(
303 contact::Column::UserIdA
304 .eq(id_a)
305 .and(contact::Column::UserIdB.eq(id_b))
306 .and(contact::Column::AToB.eq(a_to_b)),
307 )
308 .exec(&*tx)
309 .await?;
310 result.rows_affected
311 } else {
312 let result = contact::Entity::delete_many()
313 .filter(
314 contact::Column::UserIdA
315 .eq(id_a)
316 .and(contact::Column::UserIdB.eq(id_b))
317 .and(contact::Column::AToB.eq(a_to_b))
318 .and(contact::Column::Accepted.eq(false)),
319 )
320 .exec(&*tx)
321 .await?;
322
323 result.rows_affected
324 };
325
326 if rows_affected == 0 {
327 Err(anyhow!("no such contact request"))?
328 }
329
330 let mut notifications = Vec::new();
331 notifications.extend(
332 self.mark_notification_as_read_with_response(
333 responder_id,
334 &rpc::Notification::ContactRequest {
335 sender_id: requester_id.to_proto(),
336 },
337 accept,
338 &tx,
339 )
340 .await?,
341 );
342
343 if accept {
344 notifications.extend(
345 self.create_notification(
346 requester_id,
347 rpc::Notification::ContactRequestAccepted {
348 responder_id: responder_id.to_proto(),
349 },
350 true,
351 &tx,
352 )
353 .await?,
354 );
355 }
356
357 Ok(notifications)
358 })
359 .await
360 }
361}