Determine whether a contact is busy via the database

Antonio Scandurra created

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql   |  2 
crates/collab/migrations/20221111092550_reconnection_support.sql |  1 
crates/collab/src/db.rs                                          | 38 +
crates/collab/src/db_tests.rs                                    | 46 +
crates/collab/src/rpc.rs                                         | 10 
crates/collab/src/rpc/store.rs                                   | 22 
6 files changed, 81 insertions(+), 38 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -56,7 +56,7 @@ CREATE TABLE "project_collaborators" (
     "is_host" BOOLEAN NOT NULL
 );
 CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
-CREATE UNIQUE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id", "replica_id");
+CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
 
 CREATE TABLE "worktrees" (
     "id" INTEGER NOT NULL,

crates/collab/migrations/20221111092550_reconnection_support.sql 🔗

@@ -18,6 +18,7 @@ CREATE TABLE "project_collaborators" (
     "is_host" BOOLEAN NOT NULL
 );
 CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
+CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
 
 CREATE TABLE IF NOT EXISTS "worktrees" (
     "id" INTEGER NOT NULL,

crates/collab/src/db.rs 🔗

@@ -1558,24 +1558,25 @@ where
     pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
         self.transact(|mut tx| async move {
             let query = "
-                SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify
+                SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify, (room_participants.id IS NOT NULL) as busy
                 FROM contacts
+                LEFT JOIN room_participants ON room_participants.user_id = $1
                 WHERE user_id_a = $1 OR user_id_b = $1;
             ";
 
-            let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool)>(query)
+            let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool, bool)>(query)
                 .bind(user_id)
                 .fetch(&mut tx);
 
             let mut contacts = Vec::new();
             while let Some(row) = rows.next().await {
-                let (user_id_a, user_id_b, a_to_b, accepted, should_notify) = row?;
-
+                let (user_id_a, user_id_b, a_to_b, accepted, should_notify, busy) = row?;
                 if user_id_a == user_id {
                     if accepted {
                         contacts.push(Contact::Accepted {
                             user_id: user_id_b,
                             should_notify: should_notify && a_to_b,
+                            busy
                         });
                     } else if a_to_b {
                         contacts.push(Contact::Outgoing { user_id: user_id_b })
@@ -1589,6 +1590,7 @@ where
                     contacts.push(Contact::Accepted {
                         user_id: user_id_a,
                         should_notify: should_notify && !a_to_b,
+                        busy
                     });
                 } else if a_to_b {
                     contacts.push(Contact::Incoming {
@@ -1607,6 +1609,23 @@ where
         .await
     }
 
+    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
+        self.transact(|mut tx| async move {
+            Ok(sqlx::query_scalar::<_, i32>(
+                "
+                SELECT 1
+                FROM room_participants
+                WHERE room_participants.user_id = $1
+                ",
+            )
+            .bind(user_id)
+            .fetch_optional(&mut tx)
+            .await?
+            .is_some())
+        })
+        .await
+    }
+
     pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
         self.transact(|mut tx| async move {
             let (id_a, id_b) = if user_id_1 < user_id_2 {
@@ -1657,6 +1676,7 @@ where
                 .await?;
 
             if result.rows_affected() == 1 {
+                tx.commit().await?;
                 Ok(())
             } else {
                 Err(anyhow!("contact already requested"))?
@@ -1682,6 +1702,7 @@ where
                 .await?;
 
             if result.rows_affected() == 1 {
+                tx.commit().await?;
                 Ok(())
             } else {
                 Err(anyhow!("no such contact"))?
@@ -1721,10 +1742,11 @@ where
                 .await?;
 
             if result.rows_affected() == 0 {
-                Err(anyhow!("no such contact request"))?;
+                Err(anyhow!("no such contact request"))?
+            } else {
+                tx.commit().await?;
+                Ok(())
             }
-
-            Ok(())
         })
         .await
     }
@@ -1766,6 +1788,7 @@ where
                     .await?
             };
             if result.rows_affected() == 1 {
+                tx.commit().await?;
                 Ok(())
             } else {
                 Err(anyhow!("no such contact request"))?
@@ -1977,6 +2000,7 @@ pub enum Contact {
     Accepted {
         user_id: UserId,
         should_notify: bool,
+        busy: bool,
     },
     Outgoing {
         user_id: UserId,

crates/collab/src/db_tests.rs 🔗

@@ -258,7 +258,8 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
         db.get_contacts(user_1).await.unwrap(),
         &[Contact::Accepted {
             user_id: user_2,
-            should_notify: true
+            should_notify: true,
+            busy: false,
         }],
     );
     assert!(db.has_contact(user_1, user_2).await.unwrap());
@@ -268,6 +269,7 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
         &[Contact::Accepted {
             user_id: user_1,
             should_notify: false,
+            busy: false,
         }]
     );
 
@@ -284,6 +286,7 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
         &[Contact::Accepted {
             user_id: user_2,
             should_notify: true,
+            busy: false,
         }]
     );
 
@@ -296,6 +299,7 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
         &[Contact::Accepted {
             user_id: user_2,
             should_notify: false,
+            busy: false,
         }]
     );
 
@@ -309,10 +313,12 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
             Contact::Accepted {
                 user_id: user_2,
                 should_notify: false,
+                busy: false,
             },
             Contact::Accepted {
                 user_id: user_3,
-                should_notify: false
+                should_notify: false,
+                busy: false,
             }
         ]
     );
@@ -320,7 +326,8 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
         db.get_contacts(user_3).await.unwrap(),
         &[Contact::Accepted {
             user_id: user_1,
-            should_notify: false
+            should_notify: false,
+            busy: false,
         }],
     );
 
@@ -335,14 +342,16 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, {
         db.get_contacts(user_2).await.unwrap(),
         &[Contact::Accepted {
             user_id: user_1,
-            should_notify: false
+            should_notify: false,
+            busy: false,
         }]
     );
     assert_eq!(
         db.get_contacts(user_3).await.unwrap(),
         &[Contact::Accepted {
             user_id: user_1,
-            should_notify: false
+            should_notify: false,
+            busy: false,
         }],
     );
 });
@@ -504,14 +513,16 @@ async fn test_invite_codes() {
         db.get_contacts(user1).await.unwrap(),
         [Contact::Accepted {
             user_id: user2,
-            should_notify: true
+            should_notify: true,
+            busy: false,
         }]
     );
     assert_eq!(
         db.get_contacts(user2).await.unwrap(),
         [Contact::Accepted {
             user_id: user1,
-            should_notify: false
+            should_notify: false,
+            busy: false,
         }]
     );
     assert_eq!(
@@ -550,11 +561,13 @@ async fn test_invite_codes() {
         [
             Contact::Accepted {
                 user_id: user2,
-                should_notify: true
+                should_notify: true,
+                busy: false,
             },
             Contact::Accepted {
                 user_id: user3,
-                should_notify: true
+                should_notify: true,
+                busy: false,
             }
         ]
     );
@@ -562,7 +575,8 @@ async fn test_invite_codes() {
         db.get_contacts(user3).await.unwrap(),
         [Contact::Accepted {
             user_id: user1,
-            should_notify: false
+            should_notify: false,
+            busy: false,
         }]
     );
     assert_eq!(
@@ -607,15 +621,18 @@ async fn test_invite_codes() {
         [
             Contact::Accepted {
                 user_id: user2,
-                should_notify: true
+                should_notify: true,
+                busy: false,
             },
             Contact::Accepted {
                 user_id: user3,
-                should_notify: true
+                should_notify: true,
+                busy: false,
             },
             Contact::Accepted {
                 user_id: user4,
-                should_notify: true
+                should_notify: true,
+                busy: false,
             }
         ]
     );
@@ -623,7 +640,8 @@ async fn test_invite_codes() {
         db.get_contacts(user4).await.unwrap(),
         [Contact::Accepted {
             user_id: user1,
-            should_notify: false
+            should_notify: false,
+            busy: false,
         }]
     );
     assert_eq!(

crates/collab/src/rpc.rs 🔗

@@ -465,7 +465,7 @@ impl Server {
         if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
             if let Some(code) = &user.invite_code {
                 let store = self.store().await;
-                let invitee_contact = store.contact_for_user(invitee_id, true);
+                let invitee_contact = store.contact_for_user(invitee_id, true, false);
                 for connection_id in store.connection_ids_for_user(inviter_id) {
                     self.peer.send(
                         connection_id,
@@ -895,8 +895,9 @@ impl Server {
 
     async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
         let contacts = self.app_state.db.get_contacts(user_id).await?;
+        let busy = self.app_state.db.is_user_busy(user_id).await?;
         let store = self.store().await;
-        let updated_contact = store.contact_for_user(user_id, false);
+        let updated_contact = store.contact_for_user(user_id, false, busy);
         for contact in contacts {
             if let db::Contact::Accepted {
                 user_id: contact_user_id,
@@ -1575,6 +1576,7 @@ impl Server {
                 .db
                 .respond_to_contact_request(responder_id, requester_id, accept)
                 .await?;
+            let busy = self.app_state.db.is_user_busy(requester_id).await?;
 
             let store = self.store().await;
             // Update responder with new contact
@@ -1582,7 +1584,7 @@ impl Server {
             if accept {
                 update
                     .contacts
-                    .push(store.contact_for_user(requester_id, false));
+                    .push(store.contact_for_user(requester_id, false, busy));
             }
             update
                 .remove_incoming_requests
@@ -1596,7 +1598,7 @@ impl Server {
             if accept {
                 update
                     .contacts
-                    .push(store.contact_for_user(responder_id, true));
+                    .push(store.contact_for_user(responder_id, true, busy));
             }
             update
                 .remove_outgoing_requests

crates/collab/src/rpc/store.rs 🔗

@@ -3,7 +3,7 @@ use anyhow::{anyhow, Result};
 use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
 use rpc::{proto, ConnectionId};
 use serde::Serialize;
-use std::{mem, path::PathBuf, str};
+use std::{path::PathBuf, str};
 use tracing::instrument;
 
 pub type RoomId = u64;
@@ -156,14 +156,6 @@ impl Store {
             .is_empty()
     }
 
-    fn is_user_busy(&self, user_id: UserId) -> bool {
-        self.connected_users
-            .get(&user_id)
-            .unwrap_or(&Default::default())
-            .active_call
-            .is_some()
-    }
-
     pub fn build_initial_contacts_update(
         &self,
         contacts: Vec<db::Contact>,
@@ -175,10 +167,11 @@ impl Store {
                 db::Contact::Accepted {
                     user_id,
                     should_notify,
+                    busy,
                 } => {
                     update
                         .contacts
-                        .push(self.contact_for_user(user_id, should_notify));
+                        .push(self.contact_for_user(user_id, should_notify, busy));
                 }
                 db::Contact::Outgoing { user_id } => {
                     update.outgoing_requests.push(user_id.to_proto())
@@ -198,11 +191,16 @@ impl Store {
         update
     }
 
-    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
+    pub fn contact_for_user(
+        &self,
+        user_id: UserId,
+        should_notify: bool,
+        busy: bool,
+    ) -> proto::Contact {
         proto::Contact {
             user_id: user_id.to_proto(),
             online: self.is_user_online(user_id),
-            busy: self.is_user_busy(user_id),
+            busy,
             should_notify,
         }
     }