Remove participants from live-kit rooms when they leave zed rooms

Antonio Scandurra created

Change summary

crates/collab/.env.toml           |  3 ++
crates/collab/src/rpc.rs          | 41 +++++++++++++++++++++++---------
crates/collab/src/rpc/store.rs    | 20 ++++++++-------
crates/live_kit_server/src/api.rs | 36 ++++++++++++++++++++++------
4 files changed, 71 insertions(+), 29 deletions(-)

Detailed changes

crates/collab/.env.toml 🔗

@@ -2,6 +2,9 @@ DATABASE_URL = "postgres://postgres@localhost/zed"
 HTTP_PORT = 8080
 API_TOKEN = "secret"
 INVITE_LINK_PREFIX = "http://localhost:3000/invites/"
+LIVE_KIT_SERVER = "http://localhost:7880"
+LIVE_KIT_KEY = "devkey"
+LIVE_KIT_SECRET = "secret"
 
 # HONEYCOMB_API_KEY=
 # HONEYCOMB_DATASET=

crates/collab/src/rpc.rs 🔗

@@ -473,6 +473,7 @@ impl Server {
 
         let mut projects_to_unshare = Vec::new();
         let mut contacts_to_update = HashSet::default();
+        let mut room_left = None;
         {
             let mut store = self.store().await;
             let removed_connection = store.remove_connection(connection_id)?;
@@ -501,23 +502,24 @@ impl Server {
                 });
             }
 
+            if let Some(room) = removed_connection.room {
+                self.room_updated(&room);
+                room_left = Some(self.room_left(&room, removed_connection.user_id));
+            }
+
+            contacts_to_update.insert(removed_connection.user_id);
             for connection_id in removed_connection.canceled_call_connection_ids {
                 self.peer
                     .send(connection_id, proto::CallCanceled {})
                     .trace_err();
                 contacts_to_update.extend(store.user_id_for_connection(connection_id).ok());
             }
-
-            if let Some(room) = removed_connection
-                .room_id
-                .and_then(|room_id| store.room(room_id))
-            {
-                self.room_updated(room);
-            }
-
-            contacts_to_update.insert(removed_connection.user_id);
         };
 
+        if let Some(room_left) = room_left {
+            room_left.await.trace_err();
+        }
+
         for user_id in contacts_to_update {
             self.update_user_contacts(user_id).await.trace_err();
         }
@@ -682,6 +684,7 @@ impl Server {
 
     async fn leave_room(self: Arc<Server>, message: TypedEnvelope<proto::LeaveRoom>) -> Result<()> {
         let mut contacts_to_update = HashSet::default();
+        let room_left;
         {
             let mut store = self.store().await;
             let user_id = store.user_id_for_connection(message.sender_id)?;
@@ -720,9 +723,8 @@ impl Server {
                 }
             }
 
-            if let Some(room) = left_room.room {
-                self.room_updated(room);
-            }
+            self.room_updated(&left_room.room);
+            room_left = self.room_left(&left_room.room, user_id);
 
             for connection_id in left_room.canceled_call_connection_ids {
                 self.peer
@@ -732,6 +734,7 @@ impl Server {
             }
         }
 
+        room_left.await.trace_err();
         for user_id in contacts_to_update {
             self.update_user_contacts(user_id).await?;
         }
@@ -880,6 +883,20 @@ impl Server {
         }
     }
 
+    fn room_left(&self, room: &proto::Room, user_id: UserId) -> impl Future<Output = Result<()>> {
+        let client = self.app_state.live_kit_client.clone();
+        let room_name = room.live_kit_room.clone();
+        async move {
+            if let Some(client) = client {
+                client
+                    .remove_participant(room_name, user_id.to_string())
+                    .await?;
+            }
+
+            Ok(())
+        }
+    }
+
     async fn share_project(
         self: Arc<Server>,
         request: TypedEnvelope<proto::ShareProject>,

crates/collab/src/rpc/store.rs 🔗

@@ -4,7 +4,7 @@ use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
 use nanoid::nanoid;
 use rpc::{proto, ConnectionId};
 use serde::Serialize;
-use std::{mem, path::PathBuf, str, time::Duration};
+use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
 use time::OffsetDateTime;
 use tracing::instrument;
 use util::post_inc;
@@ -85,12 +85,12 @@ pub struct Channel {
 pub type ReplicaId = u16;
 
 #[derive(Default)]
-pub struct RemovedConnectionState {
+pub struct RemovedConnectionState<'a> {
     pub user_id: UserId,
     pub hosted_projects: Vec<Project>,
     pub guest_projects: Vec<LeftProject>,
     pub contact_ids: HashSet<UserId>,
-    pub room_id: Option<RoomId>,
+    pub room: Option<Cow<'a, proto::Room>>,
     pub canceled_call_connection_ids: Vec<ConnectionId>,
 }
 
@@ -103,7 +103,7 @@ pub struct LeftProject {
 }
 
 pub struct LeftRoom<'a> {
-    pub room: Option<&'a proto::Room>,
+    pub room: Cow<'a, proto::Room>,
     pub unshared_projects: Vec<Project>,
     pub left_projects: Vec<LeftProject>,
     pub canceled_call_connection_ids: Vec<ConnectionId>,
@@ -218,7 +218,7 @@ impl Store {
             let left_room = self.leave_room(room_id, connection_id)?;
             result.hosted_projects = left_room.unshared_projects;
             result.guest_projects = left_room.left_projects;
-            result.room_id = Some(room_id);
+            result.room = Some(Cow::Owned(left_room.room.into_owned()));
             result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
         }
 
@@ -495,12 +495,14 @@ impl Store {
                 }
             });
 
-        if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
-            self.rooms.remove(&room_id);
-        }
+        let room = if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
+            Cow::Owned(self.rooms.remove(&room_id).unwrap())
+        } else {
+            Cow::Borrowed(self.rooms.get(&room_id).unwrap())
+        };
 
         Ok(LeftRoom {
-            room: self.rooms.get(&room_id),
+            room,
             unshared_projects,
             left_projects,
             canceled_call_connection_ids,

crates/live_kit_server/src/api.rs 🔗

@@ -2,13 +2,14 @@ use crate::{proto, token};
 use anyhow::{anyhow, Result};
 use prost::Message;
 use reqwest::header::CONTENT_TYPE;
-use std::future::Future;
+use std::{future::Future, sync::Arc};
 
+#[derive(Clone)]
 pub struct Client {
     http: reqwest::Client,
-    uri: String,
-    key: String,
-    secret: String,
+    uri: Arc<str>,
+    key: Arc<str>,
+    secret: Arc<str>,
 }
 
 impl Client {
@@ -19,9 +20,9 @@ impl Client {
 
         Self {
             http: reqwest::Client::new(),
-            uri,
-            key,
-            secret,
+            uri: uri.into(),
+            key: key.into(),
+            secret: secret.into(),
         }
     }
 
@@ -49,7 +50,26 @@ impl Client {
             proto::DeleteRoomRequest { room: name },
         );
         async move {
-            response.await?;
+            let _: proto::DeleteRoomResponse = response.await?;
+            Ok(())
+        }
+    }
+
+    pub fn remove_participant(
+        &self,
+        room: String,
+        identity: String,
+    ) -> impl Future<Output = Result<()>> {
+        let response = self.request(
+            "twirp/livekit.RoomService/RemoveParticipant",
+            token::VideoGrant {
+                room_admin: Some(true),
+                ..Default::default()
+            },
+            proto::RoomParticipantIdentity { room, identity },
+        );
+        async move {
+            let _: proto::RemoveParticipantResponse = response.await?;
             Ok(())
         }
     }