diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index d262d6a8bd414a40cc71cc56690b3232e8eaaa81..d6759fb5246cfe6653db215cfc5ffe7f733f5d8b 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -56,7 +56,7 @@ CREATE TABLE "project_collaborators" ( "is_host" BOOLEAN NOT NULL ); CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); -CREATE UNIQUE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id", "replica_id"); +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); CREATE TABLE "worktrees" ( "id" INTEGER NOT NULL, diff --git a/crates/collab/migrations/20221111092550_reconnection_support.sql b/crates/collab/migrations/20221111092550_reconnection_support.sql index 7b82ce9ce7f49ec953a2c8ef54e2cdbfe07d3274..617e282a0a479ecefc4d9a7339397c7a2b3c32d0 100644 --- a/crates/collab/migrations/20221111092550_reconnection_support.sql +++ b/crates/collab/migrations/20221111092550_reconnection_support.sql @@ -18,6 +18,7 @@ CREATE TABLE "project_collaborators" ( "is_host" BOOLEAN NOT NULL ); CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); CREATE TABLE IF NOT EXISTS "worktrees" ( "id" INTEGER NOT NULL, diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index ba014624af4845a950cb5a94f14b579fe022ad87..1df96870d6bc0b3fb1b69cc08fcde073fcf34e36 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1558,24 +1558,25 @@ where pub async fn get_contacts(&self, user_id: UserId) -> Result> { self.transact(|mut tx| async move { let query = " - SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify + SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify, (room_participants.id IS NOT NULL) as busy FROM contacts + LEFT JOIN room_participants ON room_participants.user_id = $1 WHERE user_id_a = $1 OR user_id_b = $1; "; - let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool)>(query) + let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool, bool)>(query) .bind(user_id) .fetch(&mut tx); let mut contacts = Vec::new(); while let Some(row) = rows.next().await { - let (user_id_a, user_id_b, a_to_b, accepted, should_notify) = row?; - + let (user_id_a, user_id_b, a_to_b, accepted, should_notify, busy) = row?; if user_id_a == user_id { if accepted { contacts.push(Contact::Accepted { user_id: user_id_b, should_notify: should_notify && a_to_b, + busy }); } else if a_to_b { contacts.push(Contact::Outgoing { user_id: user_id_b }) @@ -1589,6 +1590,7 @@ where contacts.push(Contact::Accepted { user_id: user_id_a, should_notify: should_notify && !a_to_b, + busy }); } else if a_to_b { contacts.push(Contact::Incoming { @@ -1607,6 +1609,23 @@ where .await } + pub async fn is_user_busy(&self, user_id: UserId) -> Result { + self.transact(|mut tx| async move { + Ok(sqlx::query_scalar::<_, i32>( + " + SELECT 1 + FROM room_participants + WHERE room_participants.user_id = $1 + ", + ) + .bind(user_id) + .fetch_optional(&mut tx) + .await? + .is_some()) + }) + .await + } + pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result { self.transact(|mut tx| async move { let (id_a, id_b) = if user_id_1 < user_id_2 { @@ -1657,6 +1676,7 @@ where .await?; if result.rows_affected() == 1 { + tx.commit().await?; Ok(()) } else { Err(anyhow!("contact already requested"))? @@ -1682,6 +1702,7 @@ where .await?; if result.rows_affected() == 1 { + tx.commit().await?; Ok(()) } else { Err(anyhow!("no such contact"))? @@ -1721,10 +1742,11 @@ where .await?; if result.rows_affected() == 0 { - Err(anyhow!("no such contact request"))?; + Err(anyhow!("no such contact request"))? + } else { + tx.commit().await?; + Ok(()) } - - Ok(()) }) .await } @@ -1766,6 +1788,7 @@ where .await? }; if result.rows_affected() == 1 { + tx.commit().await?; Ok(()) } else { Err(anyhow!("no such contact request"))? @@ -1977,6 +2000,7 @@ pub enum Contact { Accepted { user_id: UserId, should_notify: bool, + busy: bool, }, Outgoing { user_id: UserId, diff --git a/crates/collab/src/db_tests.rs b/crates/collab/src/db_tests.rs index 8eda7d34e298c975e53140c9ce3a7aed1551b706..444e60ddeb0c5e03df39e132189eac9ecca46033 100644 --- a/crates/collab/src/db_tests.rs +++ b/crates/collab/src/db_tests.rs @@ -258,7 +258,8 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { db.get_contacts(user_1).await.unwrap(), &[Contact::Accepted { user_id: user_2, - should_notify: true + should_notify: true, + busy: false, }], ); assert!(db.has_contact(user_1, user_2).await.unwrap()); @@ -268,6 +269,7 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { &[Contact::Accepted { user_id: user_1, should_notify: false, + busy: false, }] ); @@ -284,6 +286,7 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { &[Contact::Accepted { user_id: user_2, should_notify: true, + busy: false, }] ); @@ -296,6 +299,7 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { &[Contact::Accepted { user_id: user_2, should_notify: false, + busy: false, }] ); @@ -309,10 +313,12 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { Contact::Accepted { user_id: user_2, should_notify: false, + busy: false, }, Contact::Accepted { user_id: user_3, - should_notify: false + should_notify: false, + busy: false, } ] ); @@ -320,7 +326,8 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { db.get_contacts(user_3).await.unwrap(), &[Contact::Accepted { user_id: user_1, - should_notify: false + should_notify: false, + busy: false, }], ); @@ -335,14 +342,16 @@ test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { db.get_contacts(user_2).await.unwrap(), &[Contact::Accepted { user_id: user_1, - should_notify: false + should_notify: false, + busy: false, }] ); assert_eq!( db.get_contacts(user_3).await.unwrap(), &[Contact::Accepted { user_id: user_1, - should_notify: false + should_notify: false, + busy: false, }], ); }); @@ -504,14 +513,16 @@ async fn test_invite_codes() { db.get_contacts(user1).await.unwrap(), [Contact::Accepted { user_id: user2, - should_notify: true + should_notify: true, + busy: false, }] ); assert_eq!( db.get_contacts(user2).await.unwrap(), [Contact::Accepted { user_id: user1, - should_notify: false + should_notify: false, + busy: false, }] ); assert_eq!( @@ -550,11 +561,13 @@ async fn test_invite_codes() { [ Contact::Accepted { user_id: user2, - should_notify: true + should_notify: true, + busy: false, }, Contact::Accepted { user_id: user3, - should_notify: true + should_notify: true, + busy: false, } ] ); @@ -562,7 +575,8 @@ async fn test_invite_codes() { db.get_contacts(user3).await.unwrap(), [Contact::Accepted { user_id: user1, - should_notify: false + should_notify: false, + busy: false, }] ); assert_eq!( @@ -607,15 +621,18 @@ async fn test_invite_codes() { [ Contact::Accepted { user_id: user2, - should_notify: true + should_notify: true, + busy: false, }, Contact::Accepted { user_id: user3, - should_notify: true + should_notify: true, + busy: false, }, Contact::Accepted { user_id: user4, - should_notify: true + should_notify: true, + busy: false, } ] ); @@ -623,7 +640,8 @@ async fn test_invite_codes() { db.get_contacts(user4).await.unwrap(), [Contact::Accepted { user_id: user1, - should_notify: false + should_notify: false, + busy: false, }] ); assert_eq!( diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index daf898ddf6263c51c15a2ad6345afa0f6fe4f96a..627a22426a76f30964d61d364a14529154498606 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -465,7 +465,7 @@ impl Server { if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? { if let Some(code) = &user.invite_code { let store = self.store().await; - let invitee_contact = store.contact_for_user(invitee_id, true); + let invitee_contact = store.contact_for_user(invitee_id, true, false); for connection_id in store.connection_ids_for_user(inviter_id) { self.peer.send( connection_id, @@ -895,8 +895,9 @@ impl Server { async fn update_user_contacts(self: &Arc, user_id: UserId) -> Result<()> { let contacts = self.app_state.db.get_contacts(user_id).await?; + let busy = self.app_state.db.is_user_busy(user_id).await?; let store = self.store().await; - let updated_contact = store.contact_for_user(user_id, false); + let updated_contact = store.contact_for_user(user_id, false, busy); for contact in contacts { if let db::Contact::Accepted { user_id: contact_user_id, @@ -1575,6 +1576,7 @@ impl Server { .db .respond_to_contact_request(responder_id, requester_id, accept) .await?; + let busy = self.app_state.db.is_user_busy(requester_id).await?; let store = self.store().await; // Update responder with new contact @@ -1582,7 +1584,7 @@ impl Server { if accept { update .contacts - .push(store.contact_for_user(requester_id, false)); + .push(store.contact_for_user(requester_id, false, busy)); } update .remove_incoming_requests @@ -1596,7 +1598,7 @@ impl Server { if accept { update .contacts - .push(store.contact_for_user(responder_id, true)); + .push(store.contact_for_user(responder_id, true, busy)); } update .remove_outgoing_requests diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index a9a15e7b2aa775b4dba3de5a7c64d6623e4b9489..4be93547889683d75a7439fb98673ad4532e308a 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -3,7 +3,7 @@ use anyhow::{anyhow, Result}; use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; use rpc::{proto, ConnectionId}; use serde::Serialize; -use std::{mem, path::PathBuf, str}; +use std::{path::PathBuf, str}; use tracing::instrument; pub type RoomId = u64; @@ -156,14 +156,6 @@ impl Store { .is_empty() } - fn is_user_busy(&self, user_id: UserId) -> bool { - self.connected_users - .get(&user_id) - .unwrap_or(&Default::default()) - .active_call - .is_some() - } - pub fn build_initial_contacts_update( &self, contacts: Vec, @@ -175,10 +167,11 @@ impl Store { db::Contact::Accepted { user_id, should_notify, + busy, } => { update .contacts - .push(self.contact_for_user(user_id, should_notify)); + .push(self.contact_for_user(user_id, should_notify, busy)); } db::Contact::Outgoing { user_id } => { update.outgoing_requests.push(user_id.to_proto()) @@ -198,11 +191,16 @@ impl Store { update } - pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact { + pub fn contact_for_user( + &self, + user_id: UserId, + should_notify: bool, + busy: bool, + ) -> proto::Contact { proto::Contact { user_id: user_id.to_proto(), online: self.is_user_online(user_id), - busy: self.is_user_busy(user_id), + busy, should_notify, } }