Cancel calls automatically when caller hangs up or disconnects

Antonio Scandurra created

Change summary

crates/collab/src/integration_tests.rs | 34 ++++++++++++
crates/collab/src/rpc.rs               | 40 +++++++++-----
crates/collab/src/rpc/store.rs         | 76 ++++++++++++++--------------
3 files changed, 97 insertions(+), 53 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -492,6 +492,40 @@ async fn test_calls_on_multiple_connections(
     deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_none());
     assert!(incoming_call_b2.next().await.unwrap().is_none());
+
+    // User A calls user B again.
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_b1.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(incoming_call_b1.next().await.unwrap().is_some());
+    assert!(incoming_call_b2.next().await.unwrap().is_some());
+
+    // User A hangs up, causing both connections to stop ringing.
+    active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap());
+    deterministic.run_until_parked();
+    assert!(incoming_call_b1.next().await.unwrap().is_none());
+    assert!(incoming_call_b2.next().await.unwrap().is_none());
+
+    // User A calls user B again.
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_b1.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(incoming_call_b1.next().await.unwrap().is_some());
+    assert!(incoming_call_b2.next().await.unwrap().is_some());
+
+    // User A disconnects up, causing both connections to stop ringing.
+    server.disconnect_client(client_a.current_user_id(cx_a));
+    cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
+    assert!(incoming_call_b1.next().await.unwrap().is_none());
+    assert!(incoming_call_b2.next().await.unwrap().is_none());
 }
 
 #[gpui::test(iterations = 10)]

crates/collab/src/rpc.rs 🔗

@@ -479,30 +479,34 @@ impl Server {
             let mut store = self.store().await;
             let removed_connection = store.remove_connection(connection_id)?;
 
-            for (project_id, project) in removed_connection.hosted_projects {
-                projects_to_unshare.push(project_id);
+            for project in removed_connection.hosted_projects {
+                projects_to_unshare.push(project.id);
                 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
                     self.peer.send(
                         conn_id,
                         proto::UnshareProject {
-                            project_id: project_id.to_proto(),
+                            project_id: project.id.to_proto(),
                         },
                     )
                 });
             }
 
-            for project_id in removed_connection.guest_project_ids {
-                if let Some(project) = store.project(project_id).trace_err() {
-                    broadcast(connection_id, project.connection_ids(), |conn_id| {
-                        self.peer.send(
-                            conn_id,
-                            proto::RemoveProjectCollaborator {
-                                project_id: project_id.to_proto(),
-                                peer_id: connection_id.0,
-                            },
-                        )
-                    });
-                }
+            for project in removed_connection.guest_projects {
+                broadcast(connection_id, project.connection_ids, |conn_id| {
+                    self.peer.send(
+                        conn_id,
+                        proto::RemoveProjectCollaborator {
+                            project_id: project.id.to_proto(),
+                            peer_id: connection_id.0,
+                        },
+                    )
+                });
+            }
+
+            for connection_id in removed_connection.canceled_call_connection_ids {
+                self.peer
+                    .send(connection_id, proto::CallCanceled {})
+                    .trace_err();
             }
 
             if let Some(room) = removed_connection
@@ -666,6 +670,12 @@ impl Server {
                 }
             }
 
+            for connection_id in left_room.canceled_call_connection_ids {
+                self.peer
+                    .send(connection_id, proto::CallCanceled {})
+                    .trace_err();
+            }
+
             if let Some(room) = left_room.room {
                 self.room_updated(room);
             }

crates/collab/src/rpc/store.rs 🔗

@@ -86,10 +86,11 @@ pub type ReplicaId = u16;
 #[derive(Default)]
 pub struct RemovedConnectionState {
     pub user_id: UserId,
-    pub hosted_projects: HashMap<ProjectId, Project>,
-    pub guest_project_ids: HashSet<ProjectId>,
+    pub hosted_projects: Vec<Project>,
+    pub guest_projects: Vec<LeftProject>,
     pub contact_ids: HashSet<UserId>,
     pub room_id: Option<RoomId>,
+    pub canceled_call_connection_ids: Vec<ConnectionId>,
 }
 
 pub struct LeftProject {
@@ -104,6 +105,7 @@ pub struct LeftRoom<'a> {
     pub room: Option<&'a proto::Room>,
     pub unshared_projects: Vec<Project>,
     pub left_projects: Vec<LeftProject>,
+    pub canceled_call_connection_ids: Vec<ConnectionId>,
 }
 
 #[derive(Copy, Clone)]
@@ -197,7 +199,6 @@ impl Store {
             .ok_or_else(|| anyhow!("no such connection"))?;
 
         let user_id = connection.user_id;
-        let connection_projects = mem::take(&mut connection.projects);
         let connection_channels = mem::take(&mut connection.channels);
 
         let mut result = RemovedConnectionState {
@@ -210,48 +211,21 @@ impl Store {
             self.leave_channel(connection_id, channel_id);
         }
 
-        // Unshare and leave all projects.
-        for project_id in connection_projects {
-            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
-                result.hosted_projects.insert(project_id, project);
-            } else if self.leave_project(project_id, connection_id).is_ok() {
-                result.guest_project_ids.insert(project_id);
-            }
-        }
-
-        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
-        connected_user.connection_ids.remove(&connection_id);
+        let connected_user = self.connected_users.get(&user_id).unwrap();
         if let Some(active_call) = connected_user.active_call.as_ref() {
             let room_id = active_call.room_id;
-            if let Some(room) = self.rooms.get_mut(&room_id) {
-                let prev_participant_count = room.participants.len();
-                room.participants
-                    .retain(|participant| participant.peer_id != connection_id.0);
-                if prev_participant_count == room.participants.len() {
-                    if connected_user.connection_ids.is_empty() {
-                        room.pending_participant_user_ids
-                            .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
-                        result.room_id = Some(room_id);
-                        connected_user.active_call = None;
-                    }
-                } else {
-                    result.room_id = Some(room_id);
-                    connected_user.active_call = None;
-                }
-
-                if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
-                    self.rooms.remove(&room_id);
-                }
-            } else {
-                tracing::error!("disconnected user claims to be in a room that does not exist");
-                connected_user.active_call = None;
-            }
+            let left_room = self.leave_room(room_id, connection_id)?;
+            result.hosted_projects = left_room.unshared_projects;
+            result.guest_projects = left_room.left_projects;
+            result.room_id = Some(room_id);
+            result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
         }
 
+        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
+        connected_user.connection_ids.remove(&connection_id);
         if connected_user.connection_ids.is_empty() {
             self.connected_users.remove(&user_id);
         }
-
         self.connections.remove(&connection_id).unwrap();
 
         Ok(result)
@@ -491,6 +465,31 @@ impl Store {
             .ok_or_else(|| anyhow!("no such room"))?;
         room.participants
             .retain(|participant| participant.peer_id != connection_id.0);
+
+        let mut canceled_call_connection_ids = Vec::new();
+        room.pending_participant_user_ids
+            .retain(|pending_participant_user_id| {
+                if let Some(connected_user) = self
+                    .connected_users
+                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
+                {
+                    if let Some(call) = connected_user.active_call.as_ref() {
+                        if call.caller_user_id == user_id {
+                            connected_user.active_call.take();
+                            canceled_call_connection_ids
+                                .extend(connected_user.connection_ids.iter().copied());
+                            false
+                        } else {
+                            true
+                        }
+                    } else {
+                        true
+                    }
+                } else {
+                    true
+                }
+            });
+
         if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
             self.rooms.remove(&room_id);
         }
@@ -499,6 +498,7 @@ impl Store {
             room: self.rooms.get(&room_id),
             unshared_projects,
             left_projects,
+            canceled_call_connection_ids,
         })
     }