1use super::*;
2
3impl Database {
4 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
5 #[derive(Debug, FromQueryResult)]
6 struct ContactWithUserBusyStatuses {
7 user_id_a: UserId,
8 user_id_b: UserId,
9 a_to_b: bool,
10 accepted: bool,
11 user_a_busy: bool,
12 user_b_busy: bool,
13 }
14
15 self.transaction(|tx| async move {
16 let user_a_participant = Alias::new("user_a_participant");
17 let user_b_participant = Alias::new("user_b_participant");
18 let mut db_contacts = contact::Entity::find()
19 .column_as(
20 Expr::col((user_a_participant.clone(), room_participant::Column::Id))
21 .is_not_null(),
22 "user_a_busy",
23 )
24 .column_as(
25 Expr::col((user_b_participant.clone(), room_participant::Column::Id))
26 .is_not_null(),
27 "user_b_busy",
28 )
29 .filter(
30 contact::Column::UserIdA
31 .eq(user_id)
32 .or(contact::Column::UserIdB.eq(user_id)),
33 )
34 .join_as(
35 JoinType::LeftJoin,
36 contact::Relation::UserARoomParticipant.def(),
37 user_a_participant,
38 )
39 .join_as(
40 JoinType::LeftJoin,
41 contact::Relation::UserBRoomParticipant.def(),
42 user_b_participant,
43 )
44 .into_model::<ContactWithUserBusyStatuses>()
45 .stream(&*tx)
46 .await?;
47
48 let mut contacts = Vec::new();
49 while let Some(db_contact) = db_contacts.next().await {
50 let db_contact = db_contact?;
51 if db_contact.user_id_a == user_id {
52 if db_contact.accepted {
53 contacts.push(Contact::Accepted {
54 user_id: db_contact.user_id_b,
55 busy: db_contact.user_b_busy,
56 });
57 } else if db_contact.a_to_b {
58 contacts.push(Contact::Outgoing {
59 user_id: db_contact.user_id_b,
60 })
61 } else {
62 contacts.push(Contact::Incoming {
63 user_id: db_contact.user_id_b,
64 });
65 }
66 } else if db_contact.accepted {
67 contacts.push(Contact::Accepted {
68 user_id: db_contact.user_id_a,
69 busy: db_contact.user_a_busy,
70 });
71 } else if db_contact.a_to_b {
72 contacts.push(Contact::Incoming {
73 user_id: db_contact.user_id_a,
74 });
75 } else {
76 contacts.push(Contact::Outgoing {
77 user_id: db_contact.user_id_a,
78 });
79 }
80 }
81
82 contacts.sort_unstable_by_key(|contact| contact.user_id());
83
84 Ok(contacts)
85 })
86 .await
87 }
88
89 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
90 self.transaction(|tx| async move {
91 let participant = room_participant::Entity::find()
92 .filter(room_participant::Column::UserId.eq(user_id))
93 .one(&*tx)
94 .await?;
95 Ok(participant.is_some())
96 })
97 .await
98 }
99
100 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
101 self.transaction(|tx| async move {
102 let (id_a, id_b) = if user_id_1 < user_id_2 {
103 (user_id_1, user_id_2)
104 } else {
105 (user_id_2, user_id_1)
106 };
107
108 Ok(contact::Entity::find()
109 .filter(
110 contact::Column::UserIdA
111 .eq(id_a)
112 .and(contact::Column::UserIdB.eq(id_b))
113 .and(contact::Column::Accepted.eq(true)),
114 )
115 .one(&*tx)
116 .await?
117 .is_some())
118 })
119 .await
120 }
121
122 pub async fn send_contact_request(
123 &self,
124 sender_id: UserId,
125 receiver_id: UserId,
126 ) -> Result<NotificationBatch> {
127 self.transaction(|tx| async move {
128 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
129 (sender_id, receiver_id, true)
130 } else {
131 (receiver_id, sender_id, false)
132 };
133
134 let rows_affected = contact::Entity::insert(contact::ActiveModel {
135 user_id_a: ActiveValue::set(id_a),
136 user_id_b: ActiveValue::set(id_b),
137 a_to_b: ActiveValue::set(a_to_b),
138 accepted: ActiveValue::set(false),
139 should_notify: ActiveValue::set(true),
140 ..Default::default()
141 })
142 .on_conflict(
143 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
144 .values([
145 (contact::Column::Accepted, true.into()),
146 (contact::Column::ShouldNotify, false.into()),
147 ])
148 .action_and_where(
149 contact::Column::Accepted.eq(false).and(
150 contact::Column::AToB
151 .eq(a_to_b)
152 .and(contact::Column::UserIdA.eq(id_b))
153 .or(contact::Column::AToB
154 .ne(a_to_b)
155 .and(contact::Column::UserIdA.eq(id_a))),
156 ),
157 )
158 .to_owned(),
159 )
160 .exec_without_returning(&*tx)
161 .await?;
162
163 if rows_affected == 0 {
164 Err(anyhow!("contact already requested"))?;
165 }
166
167 Ok(self
168 .create_notification(
169 receiver_id,
170 rpc::Notification::ContactRequest {
171 sender_id: sender_id.to_proto(),
172 },
173 true,
174 &*tx,
175 )
176 .await?
177 .into_iter()
178 .collect())
179 })
180 .await
181 }
182
183 /// Returns a bool indicating whether the removed contact had originally accepted or not
184 ///
185 /// Deletes the contact identified by the requester and responder ids, and then returns
186 /// whether the deleted contact had originally accepted or was a pending contact request.
187 ///
188 /// # Arguments
189 ///
190 /// * `requester_id` - The user that initiates this request
191 /// * `responder_id` - The user that will be removed
192 pub async fn remove_contact(
193 &self,
194 requester_id: UserId,
195 responder_id: UserId,
196 ) -> Result<(bool, Option<NotificationId>)> {
197 self.transaction(|tx| async move {
198 let (id_a, id_b) = if responder_id < requester_id {
199 (responder_id, requester_id)
200 } else {
201 (requester_id, responder_id)
202 };
203
204 let contact = contact::Entity::find()
205 .filter(
206 contact::Column::UserIdA
207 .eq(id_a)
208 .and(contact::Column::UserIdB.eq(id_b)),
209 )
210 .one(&*tx)
211 .await?
212 .ok_or_else(|| anyhow!("no such contact"))?;
213
214 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
215
216 let mut deleted_notification_id = None;
217 if !contact.accepted {
218 deleted_notification_id = self
219 .remove_notification(
220 responder_id,
221 rpc::Notification::ContactRequest {
222 sender_id: requester_id.to_proto(),
223 },
224 &*tx,
225 )
226 .await?;
227 }
228
229 Ok((contact.accepted, deleted_notification_id))
230 })
231 .await
232 }
233
234 pub async fn dismiss_contact_notification(
235 &self,
236 user_id: UserId,
237 contact_user_id: UserId,
238 ) -> Result<()> {
239 self.transaction(|tx| async move {
240 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
241 (user_id, contact_user_id, true)
242 } else {
243 (contact_user_id, user_id, false)
244 };
245
246 let result = contact::Entity::update_many()
247 .set(contact::ActiveModel {
248 should_notify: ActiveValue::set(false),
249 ..Default::default()
250 })
251 .filter(
252 contact::Column::UserIdA
253 .eq(id_a)
254 .and(contact::Column::UserIdB.eq(id_b))
255 .and(
256 contact::Column::AToB
257 .eq(a_to_b)
258 .and(contact::Column::Accepted.eq(true))
259 .or(contact::Column::AToB
260 .ne(a_to_b)
261 .and(contact::Column::Accepted.eq(false))),
262 ),
263 )
264 .exec(&*tx)
265 .await?;
266 if result.rows_affected == 0 {
267 Err(anyhow!("no such contact request"))?
268 } else {
269 Ok(())
270 }
271 })
272 .await
273 }
274
275 pub async fn respond_to_contact_request(
276 &self,
277 responder_id: UserId,
278 requester_id: UserId,
279 accept: bool,
280 ) -> Result<NotificationBatch> {
281 self.transaction(|tx| async move {
282 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
283 (responder_id, requester_id, false)
284 } else {
285 (requester_id, responder_id, true)
286 };
287 let rows_affected = if accept {
288 let result = contact::Entity::update_many()
289 .set(contact::ActiveModel {
290 accepted: ActiveValue::set(true),
291 should_notify: ActiveValue::set(true),
292 ..Default::default()
293 })
294 .filter(
295 contact::Column::UserIdA
296 .eq(id_a)
297 .and(contact::Column::UserIdB.eq(id_b))
298 .and(contact::Column::AToB.eq(a_to_b)),
299 )
300 .exec(&*tx)
301 .await?;
302 result.rows_affected
303 } else {
304 let result = contact::Entity::delete_many()
305 .filter(
306 contact::Column::UserIdA
307 .eq(id_a)
308 .and(contact::Column::UserIdB.eq(id_b))
309 .and(contact::Column::AToB.eq(a_to_b))
310 .and(contact::Column::Accepted.eq(false)),
311 )
312 .exec(&*tx)
313 .await?;
314
315 result.rows_affected
316 };
317
318 if rows_affected == 0 {
319 Err(anyhow!("no such contact request"))?
320 }
321
322 let mut notifications = Vec::new();
323 notifications.extend(
324 self.mark_notification_as_read_with_response(
325 responder_id,
326 &rpc::Notification::ContactRequest {
327 sender_id: requester_id.to_proto(),
328 },
329 accept,
330 &*tx,
331 )
332 .await?,
333 );
334
335 if accept {
336 notifications.extend(
337 self.create_notification(
338 requester_id,
339 rpc::Notification::ContactRequestAccepted {
340 responder_id: responder_id.to_proto(),
341 },
342 true,
343 &*tx,
344 )
345 .await?,
346 );
347 }
348
349 Ok(notifications)
350 })
351 .await
352 }
353}