From 4da9d61a4264604dd22427fe0e1a40050b85558b Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Tue, 9 Jan 2024 16:10:12 -0700 Subject: [PATCH] Implement live kit promotion/demotion --- crates/call/src/room.rs | 30 +++++++++ crates/collab/src/db/ids.rs | 2 +- crates/collab/src/db/queries/projects.rs | 2 +- crates/collab/src/rpc.rs | 45 ++++++++++--- .../collab/src/tests/channel_guest_tests.rs | 27 +++++++- crates/collab_ui/src/collab_panel.rs | 67 ++++++++++++++----- crates/live_kit_client/src/test.rs | 61 ++++++++++++++--- crates/live_kit_server/src/api.rs | 29 ++++++++ crates/live_kit_server/src/live_kit_server.rs | 2 +- 9 files changed, 223 insertions(+), 42 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 0a599e85cf4148fd08d902d611d1f3d70b336d39..3d1f1e70c74ab481864884e37a33e6db03ba5b05 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -754,6 +754,18 @@ impl Room { if this.local_participant.role != role { this.local_participant.role = role; + if role == proto::ChannelRole::Guest { + for project in mem::take(&mut this.shared_projects) { + if let Some(project) = project.upgrade() { + this.unshare_project(project, cx).log_err(); + } + } + this.local_participant.projects.clear(); + if let Some(live_kit_room) = &mut this.live_kit { + live_kit_room.stop_publishing(cx); + } + } + this.joined_projects.retain(|project| { if let Some(project) = project.upgrade() { project.update(cx, |project, cx| project.set_role(role, cx)); @@ -1632,6 +1644,24 @@ impl LiveKitRoom { Ok((result, old_muted)) } + + fn stop_publishing(&mut self, cx: &mut ModelContext) { + if let LocalTrack::Published { + track_publication, .. + } = mem::replace(&mut self.microphone_track, LocalTrack::None) + { + self.room.unpublish_track(track_publication); + cx.notify(); + } + + if let LocalTrack::Published { + track_publication, .. + } = mem::replace(&mut self.screen_track, LocalTrack::None) + { + self.room.unpublish_track(track_publication); + cx.notify(); + } + } } enum LocalTrack { diff --git a/crates/collab/src/db/ids.rs b/crates/collab/src/db/ids.rs index 9f77225fb704c1d4d53051cb7ee29d77c3536f80..9dbbfaff8f8d0e94ea56b02eda37485ddc695fa8 100644 --- a/crates/collab/src/db/ids.rs +++ b/crates/collab/src/db/ids.rs @@ -133,7 +133,7 @@ impl ChannelRole { } } - pub fn can_share_projects(&self) -> bool { + pub fn can_publish_to_rooms(&self) -> bool { use ChannelRole::*; match self { Admin | Member => true, diff --git a/crates/collab/src/db/queries/projects.rs b/crates/collab/src/db/queries/projects.rs index 6e1bf16309bd69315903a37ce7b57d389e749161..a55345dc16b331a124736962c456b1114e089b8e 100644 --- a/crates/collab/src/db/queries/projects.rs +++ b/crates/collab/src/db/queries/projects.rs @@ -49,7 +49,7 @@ impl Database { if !participant .role .unwrap_or(ChannelRole::Member) - .can_share_projects() + .can_publish_to_rooms() { return Err(anyhow!("guests cannot share projects"))?; } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 766cf4bcc0d7344c0780c0a34b62ac4feeb5ea84..b933ce7c9a73a4e801acf349902125f150da2ebe 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1264,18 +1264,41 @@ async fn set_room_participant_role( response: Response, session: Session, ) -> Result<()> { - let room = session - .db() - .await - .set_room_participant_role( - session.user_id, - RoomId::from_proto(request.room_id), - UserId::from_proto(request.user_id), - ChannelRole::from(request.role()), - ) - .await?; + let (live_kit_room, can_publish) = { + let room = session + .db() + .await + .set_room_participant_role( + session.user_id, + RoomId::from_proto(request.room_id), + UserId::from_proto(request.user_id), + ChannelRole::from(request.role()), + ) + .await?; + + let live_kit_room = room.live_kit_room.clone(); + let can_publish = ChannelRole::from(request.role()).can_publish_to_rooms(); + room_updated(&room, &session.peer); + (live_kit_room, can_publish) + }; + + if let Some(live_kit) = session.live_kit_client.as_ref() { + live_kit + .update_participant( + live_kit_room.clone(), + request.user_id.to_string(), + live_kit_server::proto::ParticipantPermission { + can_subscribe: true, + can_publish, + can_publish_data: can_publish, + hidden: false, + recorder: false, + }, + ) + .await + .trace_err(); + } - room_updated(&room, &session.peer); response.send(proto::Ack {})?; Ok(()) } diff --git a/crates/collab/src/tests/channel_guest_tests.rs b/crates/collab/src/tests/channel_guest_tests.rs index 8d21db678bed364cd8504c839bb3c15b339b1cd0..9b68ce3922ab24726130c356636dae7d6899ef35 100644 --- a/crates/collab/src/tests/channel_guest_tests.rs +++ b/crates/collab/src/tests/channel_guest_tests.rs @@ -1,7 +1,7 @@ use crate::tests::TestServer; use call::ActiveCall; use editor::Editor; -use gpui::{BackgroundExecutor, TestAppContext, VisualTestContext}; +use gpui::{BackgroundExecutor, TestAppContext}; use rpc::proto; #[gpui::test] @@ -132,5 +132,28 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test room_b .update(cx_b, |room, cx| room.share_microphone(cx)) .await - .unwrap() + .unwrap(); + + // B is demoted + active_call_a + .update(cx_a, |call, cx| { + call.room().unwrap().update(cx, |room, cx| { + room.set_participant_role( + client_b.user_id().unwrap(), + proto::ChannelRole::Guest, + cx, + ) + }) + }) + .await + .unwrap(); + cx_a.run_until_parked(); + + // project and buffers are no longer editable + assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); + assert!(editor_b.update(cx_b, |editor, cx| editor.read_only(cx))); + assert!(room_b + .update(cx_b, |room, cx| room.share_microphone(cx)) + .await + .is_err()); } diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index e25ad0d225680e8454ae69ce063a1fb5d67d1b2a..687c9e45b6c4ce6d41a40725c52b5bc6c0f4efba 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -865,9 +865,9 @@ impl CollabPanel { .ok(); })) }) - .when(is_call_admin && role == proto::ChannelRole::Guest, |el| { + .when(is_call_admin, |el| { el.on_secondary_mouse_down(cx.listener(move |this, event: &MouseDownEvent, cx| { - this.deploy_participant_context_menu(event.position, user_id, cx) + this.deploy_participant_context_menu(event.position, user_id, role, cx) })) }) } @@ -1006,27 +1006,60 @@ impl CollabPanel { &mut self, position: Point, user_id: u64, + role: proto::ChannelRole, cx: &mut ViewContext, ) { let this = cx.view().clone(); + if !(role == proto::ChannelRole::Guest || role == proto::ChannelRole::Member) { + return; + } let context_menu = ContextMenu::build(cx, |context_menu, cx| { - context_menu.entry( - "Allow Write Access", - None, - cx.handler_for(&this, move |_, cx| { - ActiveCall::global(cx) - .update(cx, |call, cx| { - let Some(room) = call.room() else { - return Task::ready(Ok(())); - }; - room.update(cx, |room, cx| { - room.set_participant_role(user_id, proto::ChannelRole::Member, cx) + if role == proto::ChannelRole::Guest { + context_menu.entry( + "Grant Write Access", + None, + cx.handler_for(&this, move |_, cx| { + ActiveCall::global(cx) + .update(cx, |call, cx| { + let Some(room) = call.room() else { + return Task::ready(Ok(())); + }; + room.update(cx, |room, cx| { + room.set_participant_role( + user_id, + proto::ChannelRole::Member, + cx, + ) + }) }) - }) - .detach_and_notify_err(cx) - }), - ) + .detach_and_notify_err(cx) + }), + ) + } else if role == proto::ChannelRole::Member { + context_menu.entry( + "Revoke Write Access", + None, + cx.handler_for(&this, move |_, cx| { + ActiveCall::global(cx) + .update(cx, |call, cx| { + let Some(room) = call.room() else { + return Task::ready(Ok(())); + }; + room.update(cx, |room, cx| { + room.set_participant_role( + user_id, + proto::ChannelRole::Guest, + cx, + ) + }) + }) + .detach_and_notify_err(cx) + }), + ) + } else { + unreachable!() + } }); cx.focus_view(&context_menu); diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 07a12a4ab6f5e39df0b672d8fb589497a765fd04..4575fdd2c1845c53b29876d83f1fb6a8498717dc 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use collections::{BTreeMap, HashMap}; use futures::Stream; use gpui::BackgroundExecutor; -use live_kit_server::token; +use live_kit_server::{proto, token}; use media::core_video::CVImageBuffer; use parking_lot::Mutex; use postage::watch; @@ -151,6 +151,21 @@ impl TestServer { Ok(()) } + async fn update_participant( + &self, + room_name: String, + identity: String, + permission: proto::ParticipantPermission, + ) -> Result<()> { + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.participant_permissions.insert(identity, permission); + Ok(()) + } + pub async fn disconnect_client(&self, client_identity: String) { self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); @@ -167,15 +182,22 @@ impl TestServer { let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); - if claims.video.can_publish == Some(false) { - return Err(anyhow!("user is not allowed to publish")); - } - let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + let can_publish = room + .participant_permissions + .get(&identity) + .map(|permission| permission.can_publish) + .or(claims.video.can_publish) + .unwrap_or(true); + + if !can_publish { + return Err(anyhow!("user is not allowed to publish")); + } + let track = Arc::new(RemoteVideoTrack { sid: nanoid::nanoid!(17), publisher_id: identity.clone(), @@ -209,15 +231,22 @@ impl TestServer { let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); - if claims.video.can_publish == Some(false) { - return Err(anyhow!("user is not allowed to publish")); - } - let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + let can_publish = room + .participant_permissions + .get(&identity) + .map(|permission| permission.can_publish) + .or(claims.video.can_publish) + .unwrap_or(true); + + if !can_publish { + return Err(anyhow!("user is not allowed to publish")); + } + let track = Arc::new(RemoteAudioTrack { sid: nanoid::nanoid!(17), publisher_id: identity.clone(), @@ -273,6 +302,7 @@ struct TestServerRoom { client_rooms: HashMap>, video_tracks: Vec>, audio_tracks: Vec>, + participant_permissions: HashMap, } impl TestServerRoom {} @@ -305,6 +335,19 @@ impl live_kit_server::api::Client for TestApiClient { Ok(()) } + async fn update_participant( + &self, + room: String, + identity: String, + permission: live_kit_server::proto::ParticipantPermission, + ) -> Result<()> { + let server = TestServer::get(&self.url)?; + server + .update_participant(room, identity, permission) + .await?; + Ok(()) + } + fn room_token(&self, room: &str, identity: &str) -> Result { let server = TestServer::get(&self.url)?; token::create( diff --git a/crates/live_kit_server/src/api.rs b/crates/live_kit_server/src/api.rs index 2c1e174fb41551d22b498b75d9ce36011ca5a5a9..e7e933c9c39d51d9611da135edd3749030444263 100644 --- a/crates/live_kit_server/src/api.rs +++ b/crates/live_kit_server/src/api.rs @@ -11,10 +11,18 @@ pub trait Client: Send + Sync { async fn create_room(&self, name: String) -> Result<()>; async fn delete_room(&self, name: String) -> Result<()>; async fn remove_participant(&self, room: String, identity: String) -> Result<()>; + async fn update_participant( + &self, + room: String, + identity: String, + permission: proto::ParticipantPermission, + ) -> Result<()>; fn room_token(&self, room: &str, identity: &str) -> Result; fn guest_token(&self, room: &str, identity: &str) -> Result; } +pub struct LiveKitParticipantUpdate {} + #[derive(Clone)] pub struct LiveKitClient { http: reqwest::Client, @@ -131,6 +139,27 @@ impl Client for LiveKitClient { Ok(()) } + async fn update_participant( + &self, + room: String, + identity: String, + permission: proto::ParticipantPermission, + ) -> Result<()> { + let _: proto::ParticipantInfo = self + .request( + "twirp/livekit.RoomService/UpdateParticipant", + token::VideoGrant::to_admin(&room), + proto::UpdateParticipantRequest { + room: room.clone(), + identity, + metadata: "".to_string(), + permission: Some(permission), + }, + ) + .await?; + Ok(()) + } + fn room_token(&self, room: &str, identity: &str) -> Result { token::create( &self.key, diff --git a/crates/live_kit_server/src/live_kit_server.rs b/crates/live_kit_server/src/live_kit_server.rs index 7471a96ec418a5ddeeb25d527b98865c31e75686..aa7c1f2fd0179e9062a5165bf4e332e74bf30ea7 100644 --- a/crates/live_kit_server/src/live_kit_server.rs +++ b/crates/live_kit_server/src/live_kit_server.rs @@ -1,3 +1,3 @@ pub mod api; -mod proto; +pub mod proto; pub mod token;