contacts.rs

  1use super::*;
  2
  3impl Database {
  4    /// Retrieves the contacts for the user with the given ID.
  5    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
  6        #[derive(Debug, FromQueryResult)]
  7        struct ContactWithUserBusyStatuses {
  8            user_id_a: UserId,
  9            user_id_b: UserId,
 10            a_to_b: bool,
 11            accepted: bool,
 12            user_a_busy: bool,
 13            user_b_busy: bool,
 14        }
 15
 16        self.transaction(|tx| async move {
 17            let user_a_participant = Alias::new("user_a_participant");
 18            let user_b_participant = Alias::new("user_b_participant");
 19            let mut db_contacts = contact::Entity::find()
 20                .column_as(
 21                    Expr::col((user_a_participant.clone(), room_participant::Column::Id))
 22                        .is_not_null(),
 23                    "user_a_busy",
 24                )
 25                .column_as(
 26                    Expr::col((user_b_participant.clone(), room_participant::Column::Id))
 27                        .is_not_null(),
 28                    "user_b_busy",
 29                )
 30                .filter(
 31                    contact::Column::UserIdA
 32                        .eq(user_id)
 33                        .or(contact::Column::UserIdB.eq(user_id)),
 34                )
 35                .join_as(
 36                    JoinType::LeftJoin,
 37                    contact::Relation::UserARoomParticipant.def(),
 38                    user_a_participant,
 39                )
 40                .join_as(
 41                    JoinType::LeftJoin,
 42                    contact::Relation::UserBRoomParticipant.def(),
 43                    user_b_participant,
 44                )
 45                .into_model::<ContactWithUserBusyStatuses>()
 46                .stream(&*tx)
 47                .await?;
 48
 49            let mut contacts = Vec::new();
 50            while let Some(db_contact) = db_contacts.next().await {
 51                let db_contact = db_contact?;
 52                if db_contact.user_id_a == user_id {
 53                    if db_contact.accepted {
 54                        contacts.push(Contact::Accepted {
 55                            user_id: db_contact.user_id_b,
 56                            busy: db_contact.user_b_busy,
 57                        });
 58                    } else if db_contact.a_to_b {
 59                        contacts.push(Contact::Outgoing {
 60                            user_id: db_contact.user_id_b,
 61                        })
 62                    } else {
 63                        contacts.push(Contact::Incoming {
 64                            user_id: db_contact.user_id_b,
 65                        });
 66                    }
 67                } else if db_contact.accepted {
 68                    contacts.push(Contact::Accepted {
 69                        user_id: db_contact.user_id_a,
 70                        busy: db_contact.user_a_busy,
 71                    });
 72                } else if db_contact.a_to_b {
 73                    contacts.push(Contact::Incoming {
 74                        user_id: db_contact.user_id_a,
 75                    });
 76                } else {
 77                    contacts.push(Contact::Outgoing {
 78                        user_id: db_contact.user_id_a,
 79                    });
 80                }
 81            }
 82
 83            contacts.sort_unstable_by_key(|contact| contact.user_id());
 84
 85            Ok(contacts)
 86        })
 87        .await
 88    }
 89
 90    /// Returns whether the given user is a busy (on a call).
 91    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 92        self.transaction(|tx| async move {
 93            let participant = room_participant::Entity::find()
 94                .filter(room_participant::Column::UserId.eq(user_id))
 95                .one(&*tx)
 96                .await?;
 97            Ok(participant.is_some())
 98        })
 99        .await
100    }
101
102    /// Returns whether the user with `user_id_1` has the user with `user_id_2` as a contact.
103    ///
104    /// In order for this to return `true`, `user_id_2` must have an accepted invite from `user_id_1`.
105    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
106        self.transaction(|tx| async move {
107            let (id_a, id_b) = if user_id_1 < user_id_2 {
108                (user_id_1, user_id_2)
109            } else {
110                (user_id_2, user_id_1)
111            };
112
113            Ok(contact::Entity::find()
114                .filter(
115                    contact::Column::UserIdA
116                        .eq(id_a)
117                        .and(contact::Column::UserIdB.eq(id_b))
118                        .and(contact::Column::Accepted.eq(true)),
119                )
120                .one(&*tx)
121                .await?
122                .is_some())
123        })
124        .await
125    }
126
127    /// Invite the user with `receiver_id` to be a contact of the user with `sender_id`.
128    pub async fn send_contact_request(
129        &self,
130        sender_id: UserId,
131        receiver_id: UserId,
132    ) -> Result<NotificationBatch> {
133        self.transaction(|tx| async move {
134            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
135                (sender_id, receiver_id, true)
136            } else {
137                (receiver_id, sender_id, false)
138            };
139
140            let rows_affected = contact::Entity::insert(contact::ActiveModel {
141                user_id_a: ActiveValue::set(id_a),
142                user_id_b: ActiveValue::set(id_b),
143                a_to_b: ActiveValue::set(a_to_b),
144                accepted: ActiveValue::set(false),
145                should_notify: ActiveValue::set(true),
146                ..Default::default()
147            })
148            .on_conflict(
149                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
150                    .values([
151                        (contact::Column::Accepted, true.into()),
152                        (contact::Column::ShouldNotify, false.into()),
153                    ])
154                    .action_and_where(
155                        contact::Column::Accepted.eq(false).and(
156                            contact::Column::AToB
157                                .eq(a_to_b)
158                                .and(contact::Column::UserIdA.eq(id_b))
159                                .or(contact::Column::AToB
160                                    .ne(a_to_b)
161                                    .and(contact::Column::UserIdA.eq(id_a))),
162                        ),
163                    )
164                    .to_owned(),
165            )
166            .exec_without_returning(&*tx)
167            .await?;
168
169            if rows_affected == 0 {
170                Err(anyhow!("contact already requested"))?;
171            }
172
173            Ok(self
174                .create_notification(
175                    receiver_id,
176                    rpc::Notification::ContactRequest {
177                        sender_id: sender_id.to_proto(),
178                    },
179                    true,
180                    &tx,
181                )
182                .await?
183                .into_iter()
184                .collect())
185        })
186        .await
187    }
188
189    /// Returns a bool indicating whether the removed contact had originally accepted or not
190    ///
191    /// Deletes the contact identified by the requester and responder ids, and then returns
192    /// whether the deleted contact had originally accepted or was a pending contact request.
193    ///
194    /// # Arguments
195    ///
196    /// * `requester_id` - The user that initiates this request
197    /// * `responder_id` - The user that will be removed
198    pub async fn remove_contact(
199        &self,
200        requester_id: UserId,
201        responder_id: UserId,
202    ) -> Result<(bool, Option<NotificationId>)> {
203        self.transaction(|tx| async move {
204            let (id_a, id_b) = if responder_id < requester_id {
205                (responder_id, requester_id)
206            } else {
207                (requester_id, responder_id)
208            };
209
210            let contact = contact::Entity::find()
211                .filter(
212                    contact::Column::UserIdA
213                        .eq(id_a)
214                        .and(contact::Column::UserIdB.eq(id_b)),
215                )
216                .one(&*tx)
217                .await?
218                .ok_or_else(|| anyhow!("no such contact"))?;
219
220            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
221
222            let mut deleted_notification_id = None;
223            if !contact.accepted {
224                deleted_notification_id = self
225                    .remove_notification(
226                        responder_id,
227                        rpc::Notification::ContactRequest {
228                            sender_id: requester_id.to_proto(),
229                        },
230                        &tx,
231                    )
232                    .await?;
233            }
234
235            Ok((contact.accepted, deleted_notification_id))
236        })
237        .await
238    }
239
240    /// Dismisses a contact notification for the given user.
241    pub async fn dismiss_contact_notification(
242        &self,
243        user_id: UserId,
244        contact_user_id: UserId,
245    ) -> Result<()> {
246        self.transaction(|tx| async move {
247            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
248                (user_id, contact_user_id, true)
249            } else {
250                (contact_user_id, user_id, false)
251            };
252
253            let result = contact::Entity::update_many()
254                .set(contact::ActiveModel {
255                    should_notify: ActiveValue::set(false),
256                    ..Default::default()
257                })
258                .filter(
259                    contact::Column::UserIdA
260                        .eq(id_a)
261                        .and(contact::Column::UserIdB.eq(id_b))
262                        .and(
263                            contact::Column::AToB
264                                .eq(a_to_b)
265                                .and(contact::Column::Accepted.eq(true))
266                                .or(contact::Column::AToB
267                                    .ne(a_to_b)
268                                    .and(contact::Column::Accepted.eq(false))),
269                        ),
270                )
271                .exec(&*tx)
272                .await?;
273            if result.rows_affected == 0 {
274                Err(anyhow!("no such contact request"))?
275            } else {
276                Ok(())
277            }
278        })
279        .await
280    }
281
282    /// Accept or decline a contact request
283    pub async fn respond_to_contact_request(
284        &self,
285        responder_id: UserId,
286        requester_id: UserId,
287        accept: bool,
288    ) -> Result<NotificationBatch> {
289        self.transaction(|tx| async move {
290            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
291                (responder_id, requester_id, false)
292            } else {
293                (requester_id, responder_id, true)
294            };
295            let rows_affected = if accept {
296                let result = contact::Entity::update_many()
297                    .set(contact::ActiveModel {
298                        accepted: ActiveValue::set(true),
299                        should_notify: ActiveValue::set(true),
300                        ..Default::default()
301                    })
302                    .filter(
303                        contact::Column::UserIdA
304                            .eq(id_a)
305                            .and(contact::Column::UserIdB.eq(id_b))
306                            .and(contact::Column::AToB.eq(a_to_b)),
307                    )
308                    .exec(&*tx)
309                    .await?;
310                result.rows_affected
311            } else {
312                let result = contact::Entity::delete_many()
313                    .filter(
314                        contact::Column::UserIdA
315                            .eq(id_a)
316                            .and(contact::Column::UserIdB.eq(id_b))
317                            .and(contact::Column::AToB.eq(a_to_b))
318                            .and(contact::Column::Accepted.eq(false)),
319                    )
320                    .exec(&*tx)
321                    .await?;
322
323                result.rows_affected
324            };
325
326            if rows_affected == 0 {
327                Err(anyhow!("no such contact request"))?
328            }
329
330            let mut notifications = Vec::new();
331            notifications.extend(
332                self.mark_notification_as_read_with_response(
333                    responder_id,
334                    &rpc::Notification::ContactRequest {
335                        sender_id: requester_id.to_proto(),
336                    },
337                    accept,
338                    &tx,
339                )
340                .await?,
341            );
342
343            if accept {
344                notifications.extend(
345                    self.create_notification(
346                        requester_id,
347                        rpc::Notification::ContactRequestAccepted {
348                            responder_id: responder_id.to_proto(),
349                        },
350                        true,
351                        &tx,
352                    )
353                    .await?,
354                );
355            }
356
357            Ok(notifications)
358        })
359        .await
360    }
361}