Ring users upon connection if somebody was calling them before connecting

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/client/src/user.rs              |  2 
crates/collab/src/integration_tests.rs | 13 ++++
crates/collab/src/rpc.rs               | 30 ++++------
crates/collab/src/rpc/store.rs         | 80 +++++++++++++++++++--------
crates/room/src/room.rs                |  8 ++
crates/rpc/proto/zed.proto             |  4 
6 files changed, 90 insertions(+), 47 deletions(-)

Detailed changes

crates/client/src/user.rs 🔗

@@ -214,7 +214,7 @@ impl UserStore {
                 .await?,
             from: this
                 .update(&mut cx, |this, cx| {
-                    this.get_user(envelope.payload.from_user_id, cx)
+                    this.get_user(envelope.payload.caller_user_id, cx)
                 })
                 .await?,
         };

crates/collab/src/integration_tests.rs 🔗

@@ -66,6 +66,7 @@ async fn test_basic_calls(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
+    cx_b2: &mut TestAppContext,
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
@@ -111,8 +112,18 @@ async fn test_basic_calls(
         }
     );
 
-    // User B receives the call and joins the room.
+    // User B receives the call.
     let call_b = incoming_call_b.next().await.unwrap().unwrap();
+
+    // User B connects via another client and also receives a ring on the newly-connected client.
+    let client_b2 = server.create_client(cx_b2, "user_b").await;
+    let mut incoming_call_b2 = client_b2
+        .user_store
+        .update(cx_b2, |user, _| user.incoming_call());
+    deterministic.run_until_parked();
+    let _call_b2 = incoming_call_b2.next().await.unwrap().unwrap();
+
+    // User B joins the room using the first client.
     let room_b = cx_b
         .update(|cx| Room::join(&call_b, client_b.clone(), cx))
         .await

crates/collab/src/rpc.rs 🔗

@@ -388,7 +388,11 @@ impl Server {
 
             {
                 let mut store = this.store().await;
-                store.add_connection(connection_id, user_id, user.admin);
+                let incoming_call = store.add_connection(connection_id, user_id, user.admin);
+                if let Some(incoming_call) = incoming_call {
+                    this.peer.send(connection_id, incoming_call)?;
+                }
+
                 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 
                 if let Some((code, count)) = invite_code {
@@ -648,28 +652,18 @@ impl Server {
         request: TypedEnvelope<proto::Call>,
         response: Response<proto::Call>,
     ) -> Result<()> {
-        let to_user_id = UserId::from_proto(request.payload.to_user_id);
+        let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id);
         let room_id = request.payload.room_id;
         let mut calls = {
             let mut store = self.store().await;
-            let (from_user_id, recipient_connection_ids, room) =
-                store.call(room_id, request.sender_id, to_user_id)?;
+            let (room, recipient_connection_ids, incoming_call) =
+                store.call(room_id, request.sender_id, recipient_user_id)?;
             self.room_updated(room);
             recipient_connection_ids
                 .into_iter()
-                .map(|recipient_id| {
-                    self.peer.request(
-                        recipient_id,
-                        proto::IncomingCall {
-                            room_id,
-                            from_user_id: from_user_id.to_proto(),
-                            participant_user_ids: room
-                                .participants
-                                .iter()
-                                .map(|p| p.user_id)
-                                .collect(),
-                        },
-                    )
+                .map(|recipient_connection_id| {
+                    self.peer
+                        .request(recipient_connection_id, incoming_call.clone())
                 })
                 .collect::<FuturesUnordered<_>>()
         };
@@ -688,7 +682,7 @@ impl Server {
 
         {
             let mut store = self.store().await;
-            let room = store.call_failed(room_id, to_user_id)?;
+            let room = store.call_failed(room_id, recipient_user_id)?;
             self.room_updated(&room);
         }
 

crates/collab/src/rpc/store.rs 🔗

@@ -24,7 +24,7 @@ pub struct Store {
 #[derive(Default, Serialize)]
 struct ConnectedUser {
     connection_ids: HashSet<ConnectionId>,
-    active_call: Option<CallState>,
+    active_call: Option<Call>,
 }
 
 #[derive(Serialize)]
@@ -37,9 +37,10 @@ struct ConnectionState {
 }
 
 #[derive(Copy, Clone, Eq, PartialEq, Serialize)]
-struct CallState {
-    room_id: RoomId,
-    joined: bool,
+pub struct Call {
+    pub caller_user_id: UserId,
+    pub room_id: RoomId,
+    pub joined: bool,
 }
 
 #[derive(Serialize)]
@@ -146,7 +147,12 @@ impl Store {
     }
 
     #[instrument(skip(self))]
-    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
+    pub fn add_connection(
+        &mut self,
+        connection_id: ConnectionId,
+        user_id: UserId,
+        admin: bool,
+    ) -> Option<proto::IncomingCall> {
         self.connections.insert(
             connection_id,
             ConnectionState {
@@ -157,11 +163,26 @@ impl Store {
                 channels: Default::default(),
             },
         );
-        self.connected_users
-            .entry(user_id)
-            .or_default()
-            .connection_ids
-            .insert(connection_id);
+        let connected_user = self.connected_users.entry(user_id).or_default();
+        connected_user.connection_ids.insert(connection_id);
+        if let Some(active_call) = connected_user.active_call {
+            if active_call.joined {
+                None
+            } else {
+                let room = self.room(active_call.room_id)?;
+                Some(proto::IncomingCall {
+                    room_id: active_call.room_id,
+                    caller_user_id: active_call.caller_user_id.to_proto(),
+                    participant_user_ids: room
+                        .participants
+                        .iter()
+                        .map(|participant| participant.user_id)
+                        .collect(),
+                })
+            }
+        } else {
+            None
+        }
     }
 
     #[instrument(skip(self))]
@@ -393,7 +414,8 @@ impl Store {
 
         let room_id = post_inc(&mut self.next_room_id);
         self.rooms.insert(room_id, room);
-        connected_user.active_call = Some(CallState {
+        connected_user.active_call = Some(Call {
+            caller_user_id: connection.user_id,
             room_id,
             joined: true,
         });
@@ -412,14 +434,16 @@ impl Store {
         let user_id = connection.user_id;
         let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 
-        let mut connected_user = self
+        let connected_user = self
             .connected_users
             .get_mut(&user_id)
             .ok_or_else(|| anyhow!("no such connection"))?;
+        let active_call = connected_user
+            .active_call
+            .as_mut()
+            .ok_or_else(|| anyhow!("not being called"))?;
         anyhow::ensure!(
-            connected_user
-                .active_call
-                .map_or(false, |call| call.room_id == room_id && !call.joined),
+            active_call.room_id == room_id && !active_call.joined,
             "not being called on this room"
         );
 
@@ -443,10 +467,7 @@ impl Store {
                 )),
             }),
         });
-        connected_user.active_call = Some(CallState {
-            room_id,
-            joined: true,
-        });
+        active_call.joined = true;
 
         Ok((room, recipient_connection_ids))
     }
@@ -493,8 +514,8 @@ impl Store {
         room_id: RoomId,
         from_connection_id: ConnectionId,
         recipient_id: UserId,
-    ) -> Result<(UserId, Vec<ConnectionId>, &proto::Room)> {
-        let from_user_id = self.user_id_for_connection(from_connection_id)?;
+    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
+        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 
         let recipient_connection_ids = self
             .connection_ids_for_user(recipient_id)
@@ -525,12 +546,25 @@ impl Store {
             "cannot call the same user more than once"
         );
         room.pending_user_ids.push(recipient_id.to_proto());
-        recipient.active_call = Some(CallState {
+        recipient.active_call = Some(Call {
+            caller_user_id,
             room_id,
             joined: false,
         });
 
-        Ok((from_user_id, recipient_connection_ids, room))
+        Ok((
+            room,
+            recipient_connection_ids,
+            proto::IncomingCall {
+                room_id,
+                caller_user_id: caller_user_id.to_proto(),
+                participant_user_ids: room
+                    .participants
+                    .iter()
+                    .map(|participant| participant.user_id)
+                    .collect(),
+            },
+        ))
     }
 
     pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {

crates/room/src/room.rs 🔗

@@ -136,7 +136,11 @@ impl Room {
         Ok(())
     }
 
-    pub fn call(&mut self, to_user_id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+    pub fn call(
+        &mut self,
+        recipient_user_id: u64,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
         if self.status.is_offline() {
             return Task::ready(Err(anyhow!("room is offline")));
         }
@@ -147,7 +151,7 @@ impl Room {
             client
                 .request(proto::Call {
                     room_id,
-                    to_user_id,
+                    recipient_user_id,
                 })
                 .await?;
             Ok(())

crates/rpc/proto/zed.proto 🔗

@@ -181,12 +181,12 @@ message ParticipantLocation {
 
 message Call {
     uint64 room_id = 1;
-    uint64 to_user_id = 2;
+    uint64 recipient_user_id = 2;
 }
 
 message IncomingCall {
     uint64 room_id = 1;
-    uint64 from_user_id = 2;
+    uint64 caller_user_id = 2;
     repeated uint64 participant_user_ids = 3;
 }