1use anyhow::Context as _;
2
3use super::*;
4
5impl Database {
6 /// Retrieves the contacts for the user with the given ID.
7 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
8 #[derive(Debug, FromQueryResult)]
9 struct ContactWithUserBusyStatuses {
10 user_id_a: UserId,
11 user_id_b: UserId,
12 a_to_b: bool,
13 accepted: bool,
14 user_a_busy: bool,
15 user_b_busy: bool,
16 }
17
18 self.transaction(|tx| async move {
19 let user_a_participant = Alias::new("user_a_participant");
20 let user_b_participant = Alias::new("user_b_participant");
21 let mut db_contacts = contact::Entity::find()
22 .column_as(
23 Expr::col((user_a_participant.clone(), room_participant::Column::Id))
24 .is_not_null(),
25 "user_a_busy",
26 )
27 .column_as(
28 Expr::col((user_b_participant.clone(), room_participant::Column::Id))
29 .is_not_null(),
30 "user_b_busy",
31 )
32 .filter(
33 contact::Column::UserIdA
34 .eq(user_id)
35 .or(contact::Column::UserIdB.eq(user_id)),
36 )
37 .join_as(
38 JoinType::LeftJoin,
39 contact::Relation::UserARoomParticipant.def(),
40 user_a_participant,
41 )
42 .join_as(
43 JoinType::LeftJoin,
44 contact::Relation::UserBRoomParticipant.def(),
45 user_b_participant,
46 )
47 .into_model::<ContactWithUserBusyStatuses>()
48 .stream(&*tx)
49 .await?;
50
51 let mut contacts = Vec::new();
52 while let Some(db_contact) = db_contacts.next().await {
53 let db_contact = db_contact?;
54 if db_contact.user_id_a == user_id {
55 if db_contact.accepted {
56 contacts.push(Contact::Accepted {
57 user_id: db_contact.user_id_b,
58 busy: db_contact.user_b_busy,
59 });
60 } else if db_contact.a_to_b {
61 contacts.push(Contact::Outgoing {
62 user_id: db_contact.user_id_b,
63 })
64 } else {
65 contacts.push(Contact::Incoming {
66 user_id: db_contact.user_id_b,
67 });
68 }
69 } else if db_contact.accepted {
70 contacts.push(Contact::Accepted {
71 user_id: db_contact.user_id_a,
72 busy: db_contact.user_a_busy,
73 });
74 } else if db_contact.a_to_b {
75 contacts.push(Contact::Incoming {
76 user_id: db_contact.user_id_a,
77 });
78 } else {
79 contacts.push(Contact::Outgoing {
80 user_id: db_contact.user_id_a,
81 });
82 }
83 }
84
85 contacts.sort_unstable_by_key(|contact| contact.user_id());
86
87 Ok(contacts)
88 })
89 .await
90 }
91
92 /// Returns whether the given user is a busy (on a call).
93 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
94 self.transaction(|tx| async move {
95 let participant = room_participant::Entity::find()
96 .filter(room_participant::Column::UserId.eq(user_id))
97 .one(&*tx)
98 .await?;
99 Ok(participant.is_some())
100 })
101 .await
102 }
103
104 /// Returns whether the user with `user_id_1` has the user with `user_id_2` as a contact.
105 ///
106 /// In order for this to return `true`, `user_id_2` must have an accepted invite from `user_id_1`.
107 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
108 self.transaction(|tx| async move {
109 let (id_a, id_b) = if user_id_1 < user_id_2 {
110 (user_id_1, user_id_2)
111 } else {
112 (user_id_2, user_id_1)
113 };
114
115 Ok(contact::Entity::find()
116 .filter(
117 contact::Column::UserIdA
118 .eq(id_a)
119 .and(contact::Column::UserIdB.eq(id_b))
120 .and(contact::Column::Accepted.eq(true)),
121 )
122 .one(&*tx)
123 .await?
124 .is_some())
125 })
126 .await
127 }
128
129 /// Invite the user with `receiver_id` to be a contact of the user with `sender_id`.
130 pub async fn send_contact_request(
131 &self,
132 sender_id: UserId,
133 receiver_id: UserId,
134 ) -> Result<NotificationBatch> {
135 self.transaction(|tx| async move {
136 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
137 (sender_id, receiver_id, true)
138 } else {
139 (receiver_id, sender_id, false)
140 };
141
142 let rows_affected = contact::Entity::insert(contact::ActiveModel {
143 user_id_a: ActiveValue::set(id_a),
144 user_id_b: ActiveValue::set(id_b),
145 a_to_b: ActiveValue::set(a_to_b),
146 accepted: ActiveValue::set(false),
147 should_notify: ActiveValue::set(true),
148 ..Default::default()
149 })
150 .on_conflict(
151 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
152 .values([
153 (contact::Column::Accepted, true.into()),
154 (contact::Column::ShouldNotify, false.into()),
155 ])
156 .action_and_where(
157 contact::Column::Accepted.eq(false).and(
158 contact::Column::AToB
159 .eq(a_to_b)
160 .and(contact::Column::UserIdA.eq(id_b))
161 .or(contact::Column::AToB
162 .ne(a_to_b)
163 .and(contact::Column::UserIdA.eq(id_a))),
164 ),
165 )
166 .to_owned(),
167 )
168 .exec_without_returning(&*tx)
169 .await?;
170
171 if rows_affected == 0 {
172 Err(anyhow!("contact already requested"))?;
173 }
174
175 Ok(self
176 .create_notification(
177 receiver_id,
178 rpc::Notification::ContactRequest {
179 sender_id: sender_id.to_proto(),
180 },
181 true,
182 &tx,
183 )
184 .await?
185 .into_iter()
186 .collect())
187 })
188 .await
189 }
190
191 /// Returns a bool indicating whether the removed contact had originally accepted or not
192 ///
193 /// Deletes the contact identified by the requester and responder ids, and then returns
194 /// whether the deleted contact had originally accepted or was a pending contact request.
195 ///
196 /// # Arguments
197 ///
198 /// * `requester_id` - The user that initiates this request
199 /// * `responder_id` - The user that will be removed
200 pub async fn remove_contact(
201 &self,
202 requester_id: UserId,
203 responder_id: UserId,
204 ) -> Result<(bool, Option<NotificationId>)> {
205 self.transaction(|tx| async move {
206 let (id_a, id_b) = if responder_id < requester_id {
207 (responder_id, requester_id)
208 } else {
209 (requester_id, responder_id)
210 };
211
212 let contact = contact::Entity::find()
213 .filter(
214 contact::Column::UserIdA
215 .eq(id_a)
216 .and(contact::Column::UserIdB.eq(id_b)),
217 )
218 .one(&*tx)
219 .await?
220 .context("no such contact")?;
221
222 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
223
224 let mut deleted_notification_id = None;
225 if !contact.accepted {
226 deleted_notification_id = self
227 .remove_notification(
228 responder_id,
229 rpc::Notification::ContactRequest {
230 sender_id: requester_id.to_proto(),
231 },
232 &tx,
233 )
234 .await?;
235 }
236
237 Ok((contact.accepted, deleted_notification_id))
238 })
239 .await
240 }
241
242 /// Dismisses a contact notification for the given user.
243 pub async fn dismiss_contact_notification(
244 &self,
245 user_id: UserId,
246 contact_user_id: UserId,
247 ) -> Result<()> {
248 self.transaction(|tx| async move {
249 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
250 (user_id, contact_user_id, true)
251 } else {
252 (contact_user_id, user_id, false)
253 };
254
255 let result = contact::Entity::update_many()
256 .set(contact::ActiveModel {
257 should_notify: ActiveValue::set(false),
258 ..Default::default()
259 })
260 .filter(
261 contact::Column::UserIdA
262 .eq(id_a)
263 .and(contact::Column::UserIdB.eq(id_b))
264 .and(
265 contact::Column::AToB
266 .eq(a_to_b)
267 .and(contact::Column::Accepted.eq(true))
268 .or(contact::Column::AToB
269 .ne(a_to_b)
270 .and(contact::Column::Accepted.eq(false))),
271 ),
272 )
273 .exec(&*tx)
274 .await?;
275 if result.rows_affected == 0 {
276 Err(anyhow!("no such contact request"))?
277 } else {
278 Ok(())
279 }
280 })
281 .await
282 }
283
284 /// Accept or decline a contact request
285 pub async fn respond_to_contact_request(
286 &self,
287 responder_id: UserId,
288 requester_id: UserId,
289 accept: bool,
290 ) -> Result<NotificationBatch> {
291 self.transaction(|tx| async move {
292 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
293 (responder_id, requester_id, false)
294 } else {
295 (requester_id, responder_id, true)
296 };
297 let rows_affected = if accept {
298 let result = contact::Entity::update_many()
299 .set(contact::ActiveModel {
300 accepted: ActiveValue::set(true),
301 should_notify: ActiveValue::set(true),
302 ..Default::default()
303 })
304 .filter(
305 contact::Column::UserIdA
306 .eq(id_a)
307 .and(contact::Column::UserIdB.eq(id_b))
308 .and(contact::Column::AToB.eq(a_to_b)),
309 )
310 .exec(&*tx)
311 .await?;
312 result.rows_affected
313 } else {
314 let result = contact::Entity::delete_many()
315 .filter(
316 contact::Column::UserIdA
317 .eq(id_a)
318 .and(contact::Column::UserIdB.eq(id_b))
319 .and(contact::Column::AToB.eq(a_to_b))
320 .and(contact::Column::Accepted.eq(false)),
321 )
322 .exec(&*tx)
323 .await?;
324
325 result.rows_affected
326 };
327
328 if rows_affected == 0 {
329 Err(anyhow!("no such contact request"))?
330 }
331
332 let mut notifications = Vec::new();
333 notifications.extend(
334 self.mark_notification_as_read_with_response(
335 responder_id,
336 &rpc::Notification::ContactRequest {
337 sender_id: requester_id.to_proto(),
338 },
339 accept,
340 &tx,
341 )
342 .await?,
343 );
344
345 if accept {
346 notifications.extend(
347 self.create_notification(
348 requester_id,
349 rpc::Notification::ContactRequestAccepted {
350 responder_id: responder_id.to_proto(),
351 },
352 true,
353 &tx,
354 )
355 .await?,
356 );
357 }
358
359 Ok(notifications)
360 })
361 .await
362 }
363}