Implement live kit promotion/demotion

Conrad Irwin created

Change summary

crates/call/src/room.rs                        | 30 ++++++++
crates/collab/src/db/ids.rs                    |  2 
crates/collab/src/db/queries/projects.rs       |  2 
crates/collab/src/rpc.rs                       | 45 ++++++++++---
crates/collab/src/tests/channel_guest_tests.rs | 27 +++++++
crates/collab_ui/src/collab_panel.rs           | 67 ++++++++++++++-----
crates/live_kit_client/src/test.rs             | 61 +++++++++++++++--
crates/live_kit_server/src/api.rs              | 29 ++++++++
crates/live_kit_server/src/live_kit_server.rs  |  2 
9 files changed, 223 insertions(+), 42 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -754,6 +754,18 @@ impl Room {
                     if this.local_participant.role != role {
                         this.local_participant.role = role;
 
+                        if role == proto::ChannelRole::Guest {
+                            for project in mem::take(&mut this.shared_projects) {
+                                if let Some(project) = project.upgrade() {
+                                    this.unshare_project(project, cx).log_err();
+                                }
+                            }
+                            this.local_participant.projects.clear();
+                            if let Some(live_kit_room) = &mut this.live_kit {
+                                live_kit_room.stop_publishing(cx);
+                            }
+                        }
+
                         this.joined_projects.retain(|project| {
                             if let Some(project) = project.upgrade() {
                                 project.update(cx, |project, cx| project.set_role(role, cx));
@@ -1632,6 +1644,24 @@ impl LiveKitRoom {
 
         Ok((result, old_muted))
     }
+
+    fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
+        if let LocalTrack::Published {
+            track_publication, ..
+        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
+        {
+            self.room.unpublish_track(track_publication);
+            cx.notify();
+        }
+
+        if let LocalTrack::Published {
+            track_publication, ..
+        } = mem::replace(&mut self.screen_track, LocalTrack::None)
+        {
+            self.room.unpublish_track(track_publication);
+            cx.notify();
+        }
+    }
 }
 
 enum LocalTrack {

crates/collab/src/db/ids.rs 🔗

@@ -133,7 +133,7 @@ impl ChannelRole {
         }
     }
 
-    pub fn can_share_projects(&self) -> bool {
+    pub fn can_publish_to_rooms(&self) -> bool {
         use ChannelRole::*;
         match self {
             Admin | Member => true,

crates/collab/src/db/queries/projects.rs 🔗

@@ -49,7 +49,7 @@ impl Database {
             if !participant
                 .role
                 .unwrap_or(ChannelRole::Member)
-                .can_share_projects()
+                .can_publish_to_rooms()
             {
                 return Err(anyhow!("guests cannot share projects"))?;
             }

crates/collab/src/rpc.rs 🔗

@@ -1264,18 +1264,41 @@ async fn set_room_participant_role(
     response: Response<proto::SetRoomParticipantRole>,
     session: Session,
 ) -> Result<()> {
-    let room = session
-        .db()
-        .await
-        .set_room_participant_role(
-            session.user_id,
-            RoomId::from_proto(request.room_id),
-            UserId::from_proto(request.user_id),
-            ChannelRole::from(request.role()),
-        )
-        .await?;
+    let (live_kit_room, can_publish) = {
+        let room = session
+            .db()
+            .await
+            .set_room_participant_role(
+                session.user_id,
+                RoomId::from_proto(request.room_id),
+                UserId::from_proto(request.user_id),
+                ChannelRole::from(request.role()),
+            )
+            .await?;
+
+        let live_kit_room = room.live_kit_room.clone();
+        let can_publish = ChannelRole::from(request.role()).can_publish_to_rooms();
+        room_updated(&room, &session.peer);
+        (live_kit_room, can_publish)
+    };
+
+    if let Some(live_kit) = session.live_kit_client.as_ref() {
+        live_kit
+            .update_participant(
+                live_kit_room.clone(),
+                request.user_id.to_string(),
+                live_kit_server::proto::ParticipantPermission {
+                    can_subscribe: true,
+                    can_publish,
+                    can_publish_data: can_publish,
+                    hidden: false,
+                    recorder: false,
+                },
+            )
+            .await
+            .trace_err();
+    }
 
-    room_updated(&room, &session.peer);
     response.send(proto::Ack {})?;
     Ok(())
 }

crates/collab/src/tests/channel_guest_tests.rs 🔗

@@ -1,7 +1,7 @@
 use crate::tests::TestServer;
 use call::ActiveCall;
 use editor::Editor;
-use gpui::{BackgroundExecutor, TestAppContext, VisualTestContext};
+use gpui::{BackgroundExecutor, TestAppContext};
 use rpc::proto;
 
 #[gpui::test]
@@ -132,5 +132,28 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test
     room_b
         .update(cx_b, |room, cx| room.share_microphone(cx))
         .await
-        .unwrap()
+        .unwrap();
+
+    // B is demoted
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.room().unwrap().update(cx, |room, cx| {
+                room.set_participant_role(
+                    client_b.user_id().unwrap(),
+                    proto::ChannelRole::Guest,
+                    cx,
+                )
+            })
+        })
+        .await
+        .unwrap();
+    cx_a.run_until_parked();
+
+    // project and buffers are no longer editable
+    assert!(project_b.read_with(cx_b, |project, _| project.is_read_only()));
+    assert!(editor_b.update(cx_b, |editor, cx| editor.read_only(cx)));
+    assert!(room_b
+        .update(cx_b, |room, cx| room.share_microphone(cx))
+        .await
+        .is_err());
 }

crates/collab_ui/src/collab_panel.rs 🔗

@@ -865,9 +865,9 @@ impl CollabPanel {
                             .ok();
                     }))
             })
-            .when(is_call_admin && role == proto::ChannelRole::Guest, |el| {
+            .when(is_call_admin, |el| {
                 el.on_secondary_mouse_down(cx.listener(move |this, event: &MouseDownEvent, cx| {
-                    this.deploy_participant_context_menu(event.position, user_id, cx)
+                    this.deploy_participant_context_menu(event.position, user_id, role, cx)
                 }))
             })
     }
@@ -1006,27 +1006,60 @@ impl CollabPanel {
         &mut self,
         position: Point<Pixels>,
         user_id: u64,
+        role: proto::ChannelRole,
         cx: &mut ViewContext<Self>,
     ) {
         let this = cx.view().clone();
+        if !(role == proto::ChannelRole::Guest || role == proto::ChannelRole::Member) {
+            return;
+        }
 
         let context_menu = ContextMenu::build(cx, |context_menu, cx| {
-            context_menu.entry(
-                "Allow Write Access",
-                None,
-                cx.handler_for(&this, move |_, cx| {
-                    ActiveCall::global(cx)
-                        .update(cx, |call, cx| {
-                            let Some(room) = call.room() else {
-                                return Task::ready(Ok(()));
-                            };
-                            room.update(cx, |room, cx| {
-                                room.set_participant_role(user_id, proto::ChannelRole::Member, cx)
+            if role == proto::ChannelRole::Guest {
+                context_menu.entry(
+                    "Grant Write Access",
+                    None,
+                    cx.handler_for(&this, move |_, cx| {
+                        ActiveCall::global(cx)
+                            .update(cx, |call, cx| {
+                                let Some(room) = call.room() else {
+                                    return Task::ready(Ok(()));
+                                };
+                                room.update(cx, |room, cx| {
+                                    room.set_participant_role(
+                                        user_id,
+                                        proto::ChannelRole::Member,
+                                        cx,
+                                    )
+                                })
                             })
-                        })
-                        .detach_and_notify_err(cx)
-                }),
-            )
+                            .detach_and_notify_err(cx)
+                    }),
+                )
+            } else if role == proto::ChannelRole::Member {
+                context_menu.entry(
+                    "Revoke Write Access",
+                    None,
+                    cx.handler_for(&this, move |_, cx| {
+                        ActiveCall::global(cx)
+                            .update(cx, |call, cx| {
+                                let Some(room) = call.room() else {
+                                    return Task::ready(Ok(()));
+                                };
+                                room.update(cx, |room, cx| {
+                                    room.set_participant_role(
+                                        user_id,
+                                        proto::ChannelRole::Guest,
+                                        cx,
+                                    )
+                                })
+                            })
+                            .detach_and_notify_err(cx)
+                    }),
+                )
+            } else {
+                unreachable!()
+            }
         });
 
         cx.focus_view(&context_menu);

crates/live_kit_client/src/test.rs 🔗

@@ -3,7 +3,7 @@ use async_trait::async_trait;
 use collections::{BTreeMap, HashMap};
 use futures::Stream;
 use gpui::BackgroundExecutor;
-use live_kit_server::token;
+use live_kit_server::{proto, token};
 use media::core_video::CVImageBuffer;
 use parking_lot::Mutex;
 use postage::watch;
@@ -151,6 +151,21 @@ impl TestServer {
         Ok(())
     }
 
+    async fn update_participant(
+        &self,
+        room_name: String,
+        identity: String,
+        permission: proto::ParticipantPermission,
+    ) -> Result<()> {
+        self.executor.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        room.participant_permissions.insert(identity, permission);
+        Ok(())
+    }
+
     pub async fn disconnect_client(&self, client_identity: String) {
         self.executor.simulate_random_delay().await;
         let mut server_rooms = self.rooms.lock();
@@ -167,15 +182,22 @@ impl TestServer {
         let identity = claims.sub.unwrap().to_string();
         let room_name = claims.video.room.unwrap();
 
-        if claims.video.can_publish == Some(false) {
-            return Err(anyhow!("user is not allowed to publish"));
-        }
-
         let mut server_rooms = self.rooms.lock();
         let room = server_rooms
             .get_mut(&*room_name)
             .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
 
+        let can_publish = room
+            .participant_permissions
+            .get(&identity)
+            .map(|permission| permission.can_publish)
+            .or(claims.video.can_publish)
+            .unwrap_or(true);
+
+        if !can_publish {
+            return Err(anyhow!("user is not allowed to publish"));
+        }
+
         let track = Arc::new(RemoteVideoTrack {
             sid: nanoid::nanoid!(17),
             publisher_id: identity.clone(),
@@ -209,15 +231,22 @@ impl TestServer {
         let identity = claims.sub.unwrap().to_string();
         let room_name = claims.video.room.unwrap();
 
-        if claims.video.can_publish == Some(false) {
-            return Err(anyhow!("user is not allowed to publish"));
-        }
-
         let mut server_rooms = self.rooms.lock();
         let room = server_rooms
             .get_mut(&*room_name)
             .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
 
+        let can_publish = room
+            .participant_permissions
+            .get(&identity)
+            .map(|permission| permission.can_publish)
+            .or(claims.video.can_publish)
+            .unwrap_or(true);
+
+        if !can_publish {
+            return Err(anyhow!("user is not allowed to publish"));
+        }
+
         let track = Arc::new(RemoteAudioTrack {
             sid: nanoid::nanoid!(17),
             publisher_id: identity.clone(),
@@ -273,6 +302,7 @@ struct TestServerRoom {
     client_rooms: HashMap<Sid, Arc<Room>>,
     video_tracks: Vec<Arc<RemoteVideoTrack>>,
     audio_tracks: Vec<Arc<RemoteAudioTrack>>,
+    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
 }
 
 impl TestServerRoom {}
@@ -305,6 +335,19 @@ impl live_kit_server::api::Client for TestApiClient {
         Ok(())
     }
 
+    async fn update_participant(
+        &self,
+        room: String,
+        identity: String,
+        permission: live_kit_server::proto::ParticipantPermission,
+    ) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server
+            .update_participant(room, identity, permission)
+            .await?;
+        Ok(())
+    }
+
     fn room_token(&self, room: &str, identity: &str) -> Result<String> {
         let server = TestServer::get(&self.url)?;
         token::create(

crates/live_kit_server/src/api.rs 🔗

@@ -11,10 +11,18 @@ pub trait Client: Send + Sync {
     async fn create_room(&self, name: String) -> Result<()>;
     async fn delete_room(&self, name: String) -> Result<()>;
     async fn remove_participant(&self, room: String, identity: String) -> Result<()>;
+    async fn update_participant(
+        &self,
+        room: String,
+        identity: String,
+        permission: proto::ParticipantPermission,
+    ) -> Result<()>;
     fn room_token(&self, room: &str, identity: &str) -> Result<String>;
     fn guest_token(&self, room: &str, identity: &str) -> Result<String>;
 }
 
+pub struct LiveKitParticipantUpdate {}
+
 #[derive(Clone)]
 pub struct LiveKitClient {
     http: reqwest::Client,
@@ -131,6 +139,27 @@ impl Client for LiveKitClient {
         Ok(())
     }
 
+    async fn update_participant(
+        &self,
+        room: String,
+        identity: String,
+        permission: proto::ParticipantPermission,
+    ) -> Result<()> {
+        let _: proto::ParticipantInfo = self
+            .request(
+                "twirp/livekit.RoomService/UpdateParticipant",
+                token::VideoGrant::to_admin(&room),
+                proto::UpdateParticipantRequest {
+                    room: room.clone(),
+                    identity,
+                    metadata: "".to_string(),
+                    permission: Some(permission),
+                },
+            )
+            .await?;
+        Ok(())
+    }
+
     fn room_token(&self, room: &str, identity: &str) -> Result<String> {
         token::create(
             &self.key,