Include a `busy` field in `proto::Contact`

Antonio Scandurra created

Change summary

crates/client/src/user.rs              |   2 
crates/collab/src/integration_tests.rs | 200 ++++++++++++++++++++++++++-
crates/collab/src/rpc.rs               | 146 ++++++++++++--------
crates/collab/src/rpc/store.rs         |   9 +
crates/rpc/proto/zed.proto             |   3 
5 files changed, 290 insertions(+), 70 deletions(-)

Detailed changes

crates/client/src/user.rs 🔗

@@ -39,6 +39,7 @@ impl Eq for User {}
 pub struct Contact {
     pub user: Arc<User>,
     pub online: bool,
+    pub busy: bool,
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -625,6 +626,7 @@ impl Contact {
         Ok(Self {
             user,
             online: contact.online,
+            busy: contact.busy,
         })
     }
 }

crates/collab/src/integration_tests.rs 🔗

@@ -435,12 +435,14 @@ async fn test_calls_on_multiple_connections(
         })
         .await
         .unwrap();
+    deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_some());
     assert!(incoming_call_b2.next().await.unwrap().is_some());
 
     // User B declines the call on one of the two connections, causing both connections
     // to stop ringing.
     active_call_b2.update(cx_b2, |call, _| call.decline_incoming().unwrap());
+    deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_none());
     assert!(incoming_call_b2.next().await.unwrap().is_none());
 
@@ -451,6 +453,7 @@ async fn test_calls_on_multiple_connections(
         })
         .await
         .unwrap();
+    deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_some());
     assert!(incoming_call_b2.next().await.unwrap().is_some());
 
@@ -460,6 +463,7 @@ async fn test_calls_on_multiple_connections(
         .update(cx_b2, |call, cx| call.accept_incoming(cx))
         .await
         .unwrap();
+    deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_none());
     assert!(incoming_call_b2.next().await.unwrap().is_none());
 
@@ -472,6 +476,7 @@ async fn test_calls_on_multiple_connections(
         })
         .await
         .unwrap();
+    deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_some());
     assert!(incoming_call_b2.next().await.unwrap().is_some());
 
@@ -482,6 +487,7 @@ async fn test_calls_on_multiple_connections(
         })
         .await
         .unwrap();
+    deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_none());
     assert!(incoming_call_b2.next().await.unwrap().is_none());
 }
@@ -4015,19 +4021,31 @@ async fn test_contacts(
     server
         .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)])
         .await;
+    let active_call_a = cx_a.read(ActiveCall::global);
+    let active_call_b = cx_b.read(ActiveCall::global);
+    let active_call_c = cx_c.read(ActiveCall::global);
 
     deterministic.run_until_parked();
     assert_eq!(
         contacts(&client_a, cx_a),
-        [("user_b".to_string(), true), ("user_c".to_string(), true)]
+        [
+            ("user_b".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "free")
+        ]
     );
     assert_eq!(
         contacts(&client_b, cx_b),
-        [("user_a".to_string(), true), ("user_c".to_string(), true)]
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "free")
+        ]
     );
     assert_eq!(
         contacts(&client_c, cx_c),
-        [("user_a".to_string(), true), ("user_b".to_string(), true)]
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_b".to_string(), "online", "free")
+        ]
     );
 
     server.disconnect_client(client_c.current_user_id(cx_c));
@@ -4035,11 +4053,17 @@ async fn test_contacts(
     deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
     assert_eq!(
         contacts(&client_a, cx_a),
-        [("user_b".to_string(), true), ("user_c".to_string(), false)]
+        [
+            ("user_b".to_string(), "online", "free"),
+            ("user_c".to_string(), "offline", "free")
+        ]
     );
     assert_eq!(
         contacts(&client_b, cx_b),
-        [("user_a".to_string(), true), ("user_c".to_string(), false)]
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_c".to_string(), "offline", "free")
+        ]
     );
     assert_eq!(contacts(&client_c, cx_c), []);
 
@@ -4052,24 +4076,180 @@ async fn test_contacts(
     deterministic.run_until_parked();
     assert_eq!(
         contacts(&client_a, cx_a),
-        [("user_b".to_string(), true), ("user_c".to_string(), true)]
+        [
+            ("user_b".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "free")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_b, cx_b),
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "free")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_c, cx_c),
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_b".to_string(), "online", "free")
+        ]
+    );
+
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_b.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert_eq!(
+        contacts(&client_a, cx_a),
+        [
+            ("user_b".to_string(), "online", "busy"),
+            ("user_c".to_string(), "online", "free")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_b, cx_b),
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_c".to_string(), "online", "free")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_c, cx_c),
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_b".to_string(), "online", "busy")
+        ]
+    );
+
+    active_call_b.update(cx_b, |call, _| call.decline_incoming().unwrap());
+    deterministic.run_until_parked();
+    assert_eq!(
+        contacts(&client_a, cx_a),
+        [
+            ("user_b".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "free")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_b, cx_b),
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "free")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_c, cx_c),
+        [
+            ("user_a".to_string(), "online", "free"),
+            ("user_b".to_string(), "online", "free")
+        ]
+    );
+
+    active_call_c
+        .update(cx_c, |call, cx| {
+            call.invite(client_a.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert_eq!(
+        contacts(&client_a, cx_a),
+        [
+            ("user_b".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "busy")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_b, cx_b),
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_c".to_string(), "online", "busy")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_c, cx_c),
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_b".to_string(), "online", "free")
+        ]
+    );
+
+    active_call_a
+        .update(cx_a, |call, cx| call.accept_incoming(cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert_eq!(
+        contacts(&client_a, cx_a),
+        [
+            ("user_b".to_string(), "online", "free"),
+            ("user_c".to_string(), "online", "busy")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_b, cx_b),
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_c".to_string(), "online", "busy")
+        ]
+    );
+    assert_eq!(
+        contacts(&client_c, cx_c),
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_b".to_string(), "online", "free")
+        ]
+    );
+
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_b.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert_eq!(
+        contacts(&client_a, cx_a),
+        [
+            ("user_b".to_string(), "online", "busy"),
+            ("user_c".to_string(), "online", "busy")
+        ]
     );
     assert_eq!(
         contacts(&client_b, cx_b),
-        [("user_a".to_string(), true), ("user_c".to_string(), true)]
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_c".to_string(), "online", "busy")
+        ]
     );
     assert_eq!(
         contacts(&client_c, cx_c),
-        [("user_a".to_string(), true), ("user_b".to_string(), true)]
+        [
+            ("user_a".to_string(), "online", "busy"),
+            ("user_b".to_string(), "online", "busy")
+        ]
     );
 
     #[allow(clippy::type_complexity)]
-    fn contacts(client: &TestClient, cx: &TestAppContext) -> Vec<(String, bool)> {
+    fn contacts(
+        client: &TestClient,
+        cx: &TestAppContext,
+    ) -> Vec<(String, &'static str, &'static str)> {
         client.user_store.read_with(cx, |store, _| {
             store
                 .contacts()
                 .iter()
-                .map(|contact| (contact.user.github_login.clone(), contact.online))
+                .map(|contact| {
+                    (
+                        contact.user.github_login.clone(),
+                        if contact.online { "online" } else { "offline" },
+                        if contact.busy { "busy" } else { "free" },
+                    )
+                })
                 .collect()
         })
     }

crates/collab/src/rpc.rs 🔗

@@ -585,8 +585,15 @@ impl Server {
         request: TypedEnvelope<proto::CreateRoom>,
         response: Response<proto::CreateRoom>,
     ) -> Result<()> {
-        let room_id = self.store().await.create_room(request.sender_id)?;
+        let user_id;
+        let room_id;
+        {
+            let mut store = self.store().await;
+            user_id = store.user_id_for_connection(request.sender_id)?;
+            room_id = store.create_room(request.sender_id)?;
+        }
         response.send(proto::CreateRoomResponse { id: room_id })?;
+        self.update_user_contacts(user_id).await?;
         Ok(())
     }
 
@@ -595,61 +602,71 @@ impl Server {
         request: TypedEnvelope<proto::JoinRoom>,
         response: Response<proto::JoinRoom>,
     ) -> Result<()> {
-        let room_id = request.payload.id;
-        let mut store = self.store().await;
-        let (room, recipient_connection_ids) = store.join_room(room_id, request.sender_id)?;
-        for recipient_id in recipient_connection_ids {
-            self.peer
-                .send(recipient_id, proto::CallCanceled {})
-                .trace_err();
+        let user_id;
+        {
+            let mut store = self.store().await;
+            user_id = store.user_id_for_connection(request.sender_id)?;
+            let (room, recipient_connection_ids) =
+                store.join_room(request.payload.id, request.sender_id)?;
+            for recipient_id in recipient_connection_ids {
+                self.peer
+                    .send(recipient_id, proto::CallCanceled {})
+                    .trace_err();
+            }
+            response.send(proto::JoinRoomResponse {
+                room: Some(room.clone()),
+            })?;
+            self.room_updated(room);
         }
-        response.send(proto::JoinRoomResponse {
-            room: Some(room.clone()),
-        })?;
-        self.room_updated(room);
+        self.update_user_contacts(user_id).await?;
         Ok(())
     }
 
     async fn leave_room(self: Arc<Server>, message: TypedEnvelope<proto::LeaveRoom>) -> Result<()> {
-        let room_id = message.payload.id;
-        let mut store = self.store().await;
-        let left_room = store.leave_room(room_id, message.sender_id)?;
+        let user_id;
+        {
+            let mut store = self.store().await;
+            user_id = store.user_id_for_connection(message.sender_id)?;
+            let left_room = store.leave_room(message.payload.id, message.sender_id)?;
 
-        for project in left_room.unshared_projects {
-            for connection_id in project.connection_ids() {
-                self.peer.send(
-                    connection_id,
-                    proto::UnshareProject {
-                        project_id: project.id.to_proto(),
-                    },
-                )?;
+            for project in left_room.unshared_projects {
+                for connection_id in project.connection_ids() {
+                    self.peer.send(
+                        connection_id,
+                        proto::UnshareProject {
+                            project_id: project.id.to_proto(),
+                        },
+                    )?;
+                }
             }
-        }
 
-        for project in left_room.left_projects {
-            if project.remove_collaborator {
-                for connection_id in project.connection_ids {
+            for project in left_room.left_projects {
+                if project.remove_collaborator {
+                    for connection_id in project.connection_ids {
+                        self.peer.send(
+                            connection_id,
+                            proto::RemoveProjectCollaborator {
+                                project_id: project.id.to_proto(),
+                                peer_id: message.sender_id.0,
+                            },
+                        )?;
+                    }
+
                     self.peer.send(
-                        connection_id,
-                        proto::RemoveProjectCollaborator {
+                        message.sender_id,
+                        proto::UnshareProject {
                             project_id: project.id.to_proto(),
-                            peer_id: message.sender_id.0,
                         },
                     )?;
                 }
+            }
 
-                self.peer.send(
-                    message.sender_id,
-                    proto::UnshareProject {
-                        project_id: project.id.to_proto(),
-                    },
-                )?;
+            if let Some(room) = left_room.room {
+                self.room_updated(room);
             }
         }
+        self.update_user_contacts(user_id).await?;
 
-        if let Some(room) = left_room.room {
-            self.room_updated(room);
-        }
         Ok(())
     }
 
@@ -694,6 +711,7 @@ impl Server {
                 })
                 .collect::<FuturesUnordered<_>>()
         };
+        self.update_user_contacts(recipient_user_id).await?;
 
         while let Some(call_response) = calls.next().await {
             match call_response.as_ref() {
@@ -712,6 +730,7 @@ impl Server {
             let room = store.call_failed(room_id, recipient_user_id)?;
             self.room_updated(&room);
         }
+        self.update_user_contacts(recipient_user_id).await?;
 
         Err(anyhow!("failed to ring call recipient"))?
     }
@@ -721,19 +740,23 @@ impl Server {
         request: TypedEnvelope<proto::CancelCall>,
         response: Response<proto::CancelCall>,
     ) -> Result<()> {
-        let mut store = self.store().await;
-        let (room, recipient_connection_ids) = store.cancel_call(
-            request.payload.room_id,
-            UserId::from_proto(request.payload.recipient_user_id),
-            request.sender_id,
-        )?;
-        for recipient_id in recipient_connection_ids {
-            self.peer
-                .send(recipient_id, proto::CallCanceled {})
-                .trace_err();
+        let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id);
+        {
+            let mut store = self.store().await;
+            let (room, recipient_connection_ids) = store.cancel_call(
+                request.payload.room_id,
+                recipient_user_id,
+                request.sender_id,
+            )?;
+            for recipient_id in recipient_connection_ids {
+                self.peer
+                    .send(recipient_id, proto::CallCanceled {})
+                    .trace_err();
+            }
+            self.room_updated(room);
+            response.send(proto::Ack {})?;
         }
-        self.room_updated(room);
-        response.send(proto::Ack {})?;
+        self.update_user_contacts(recipient_user_id).await?;
         Ok(())
     }
 
@@ -741,15 +764,20 @@ impl Server {
         self: Arc<Server>,
         message: TypedEnvelope<proto::DeclineCall>,
     ) -> Result<()> {
-        let mut store = self.store().await;
-        let (room, recipient_connection_ids) =
-            store.decline_call(message.payload.room_id, message.sender_id)?;
-        for recipient_id in recipient_connection_ids {
-            self.peer
-                .send(recipient_id, proto::CallCanceled {})
-                .trace_err();
+        let recipient_user_id;
+        {
+            let mut store = self.store().await;
+            recipient_user_id = store.user_id_for_connection(message.sender_id)?;
+            let (room, recipient_connection_ids) =
+                store.decline_call(message.payload.room_id, message.sender_id)?;
+            for recipient_id in recipient_connection_ids {
+                self.peer
+                    .send(recipient_id, proto::CallCanceled {})
+                    .trace_err();
+            }
+            self.room_updated(room);
         }
-        self.room_updated(room);
+        self.update_user_contacts(recipient_user_id).await?;
         Ok(())
     }
 

crates/collab/src/rpc/store.rs 🔗

@@ -314,6 +314,14 @@ impl Store {
             .is_empty()
     }
 
+    fn is_user_busy(&self, user_id: UserId) -> bool {
+        self.connected_users
+            .get(&user_id)
+            .unwrap_or(&Default::default())
+            .active_call
+            .is_some()
+    }
+
     pub fn build_initial_contacts_update(
         &self,
         contacts: Vec<db::Contact>,
@@ -352,6 +360,7 @@ impl Store {
         proto::Contact {
             user_id: user_id.to_proto(),
             online: self.is_user_online(user_id),
+            busy: self.is_user_busy(user_id),
             should_notify,
         }
     }

crates/rpc/proto/zed.proto 🔗

@@ -1023,7 +1023,8 @@ message ChannelMessage {
 message Contact {
     uint64 user_id = 1;
     bool online = 2;
-    bool should_notify = 3;
+    bool busy = 3;
+    bool should_notify = 4;
 }
 
 message WorktreeMetadata {