Start on implementing a fake live-kit server

Antonio Scandurra created

Change summary

Cargo.lock                                    |  2 
crates/live_kit_client/Cargo.toml             | 10 +
crates/live_kit_client/src/live_kit_client.rs |  4 
crates/live_kit_client/src/test.rs            | 91 ++++++++++++++++++++
crates/live_kit_server/src/api.rs             | 14 --
crates/live_kit_server/src/token.rs           | 48 +++++++---
6 files changed, 134 insertions(+), 35 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3167,6 +3167,7 @@ dependencies = [
  "byteorder",
  "bytes 1.2.1",
  "cocoa",
+ "collections",
  "core-foundation",
  "core-graphics",
  "foreign-types",
@@ -3174,6 +3175,7 @@ dependencies = [
  "gpui",
  "hmac 0.12.1",
  "jwt",
+ "lazy_static",
  "live_kit_server",
  "log",
  "media",

crates/live_kit_client/Cargo.toml 🔗

@@ -12,19 +12,25 @@ doctest = false
 name = "test_app"
 
 [features]
-test-support = []
+test-support = ["collections/test-support", "gpui/test-support", "lazy_static", "live_kit_server"]
 
 [dependencies]
+collections = { path = "../collections", optional = true }
+gpui = { path = "../gpui", optional = true }
+live_kit_server = { path = "../live_kit_server", optional = true }
 media = { path = "../media" }
 
 anyhow = "1.0.38"
 core-foundation = "0.9.3"
 core-graphics = "0.22.3"
 futures = "0.3"
+lazy_static = { version = "1.4", optional = true }
 parking_lot = "0.11.1"
 
 [dev-dependencies]
-gpui = { path = "../gpui" }
+collections = { path = "../collections", features = ["test-support"] }
+gpui = { path = "../gpui", features = ["test-support"] }
+lazy_static = "1.4"
 live_kit_server = { path = "../live_kit_server" }
 media = { path = "../media" }
 

crates/live_kit_client/src/live_kit_client.rs 🔗

@@ -1,8 +1,10 @@
 pub mod prod;
-pub mod test;
 
 #[cfg(not(any(test, feature = "test-support")))]
 pub use prod::*;
 
+#[cfg(any(test, feature = "test-support"))]
+mod test;
+
 #[cfg(any(test, feature = "test-support"))]
 pub use test::*;

crates/live_kit_client/src/test.rs 🔗

@@ -1,8 +1,80 @@
-use anyhow::Result;
+use anyhow::{anyhow, Result};
+use collections::HashMap;
 use futures::{channel::mpsc, future};
+use gpui::executor::Background;
+use lazy_static::lazy_static;
 use media::core_video::CVImageBuffer;
+use parking_lot::Mutex;
 use std::{future::Future, sync::Arc};
 
+lazy_static! {
+    static ref SERVERS: Mutex<HashMap<String, Arc<FakeServer>>> = Default::default();
+}
+
+pub struct FakeServer {
+    url: String,
+    secret_key: String,
+    rooms: Mutex<HashMap<String, FakeServerRoom>>,
+    background: Arc<Background>,
+}
+
+impl FakeServer {
+    pub fn create(
+        url: String,
+        secret_key: String,
+        background: Arc<Background>,
+    ) -> Result<Arc<FakeServer>> {
+        let mut servers = SERVERS.lock();
+        if servers.contains_key(&url) {
+            Err(anyhow!("a server with url {:?} already exists", url))
+        } else {
+            let server = Arc::new(FakeServer {
+                url: url.clone(),
+                secret_key,
+                rooms: Default::default(),
+                background,
+            });
+            servers.insert(url, server.clone());
+            Ok(server)
+        }
+    }
+
+    fn get(url: &str) -> Result<Arc<FakeServer>> {
+        Ok(SERVERS
+            .lock()
+            .get(url)
+            .ok_or_else(|| anyhow!("no server found for url"))?
+            .clone())
+    }
+
+    pub fn teardown(&self) -> Result<()> {
+        SERVERS
+            .lock()
+            .remove(&self.url)
+            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
+        Ok(())
+    }
+
+    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
+        self.background.simulate_random_delay().await;
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let identity = claims.sub.unwrap().to_string();
+        let room = claims.video.room.unwrap();
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room)
+            .ok_or_else(|| anyhow!("room {} does not exist", room))?;
+        room.clients.insert(identity, client_room);
+        Ok(())
+    }
+}
+
+struct FakeServerRoom {
+    clients: HashMap<Sid, Arc<Room>>,
+}
+
+impl FakeServerRoom {}
+
 pub type Sid = String;
 
 pub struct Room;
@@ -12,8 +84,15 @@ impl Room {
         Arc::new(Self)
     }
 
-    pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
-        future::pending()
+    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
+        let this = self.clone();
+        let url = url.to_string();
+        let token = token.to_string();
+        async move {
+            let server = FakeServer::get(&url)?;
+            server.join_room(token, this).await?;
+            Ok(())
+        }
     }
 
     pub fn publish_video_track(
@@ -34,6 +113,12 @@ impl Room {
     }
 }
 
+impl Drop for Room {
+    fn drop(&mut self) {
+        todo!()
+    }
+}
+
 pub struct LocalTrackPublication;
 
 pub struct LocalVideoTrack;

crates/live_kit_server/src/api.rs 🔗

@@ -66,11 +66,7 @@ impl Client {
     ) -> impl Future<Output = Result<()>> {
         let response = self.request(
             "twirp/livekit.RoomService/RemoveParticipant",
-            token::VideoGrant {
-                room_admin: Some(true),
-                room: Some(&room),
-                ..Default::default()
-            },
+            token::VideoGrant::to_admin(&room),
             proto::RoomParticipantIdentity {
                 room: room.clone(),
                 identity,
@@ -87,13 +83,7 @@ impl Client {
             &self.key,
             &self.secret,
             Some(identity),
-            token::VideoGrant {
-                room: Some(room),
-                room_join: Some(true),
-                can_publish: Some(true),
-                can_subscribe: Some(true),
-                ..Default::default()
-            },
+            token::VideoGrant::to_join(room),
         )
     }
 

crates/live_kit_server/src/token.rs 🔗

@@ -1,28 +1,29 @@
 use anyhow::{anyhow, Result};
 use hmac::{Hmac, Mac};
-use jwt::SignWithKey;
-use serde::Serialize;
+use jwt::{SignWithKey, VerifyWithKey};
+use serde::{Deserialize, Serialize};
 use sha2::Sha256;
 use std::{
+    borrow::Cow,
     ops::Add,
     time::{Duration, SystemTime, UNIX_EPOCH},
 };
 
 static DEFAULT_TTL: Duration = Duration::from_secs(6 * 60 * 60); // 6 hours
 
-#[derive(Default, Serialize)]
+#[derive(Default, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
-struct ClaimGrants<'a> {
-    iss: &'a str,
-    sub: Option<&'a str>,
-    iat: u64,
-    exp: u64,
-    nbf: u64,
-    jwtid: Option<&'a str>,
-    video: VideoGrant<'a>,
+pub struct ClaimGrants<'a> {
+    pub iss: Cow<'a, str>,
+    pub sub: Option<Cow<'a, str>>,
+    pub iat: u64,
+    pub exp: u64,
+    pub nbf: u64,
+    pub jwtid: Option<Cow<'a, str>>,
+    pub video: VideoGrant<'a>,
 }
 
-#[derive(Default, Serialize)]
+#[derive(Default, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct VideoGrant<'a> {
     pub room_create: Option<bool>,
@@ -30,7 +31,7 @@ pub struct VideoGrant<'a> {
     pub room_list: Option<bool>,
     pub room_record: Option<bool>,
     pub room_admin: Option<bool>,
-    pub room: Option<&'a str>,
+    pub room: Option<Cow<'a, str>>,
     pub can_publish: Option<bool>,
     pub can_subscribe: Option<bool>,
     pub can_publish_data: Option<bool>,
@@ -39,9 +40,17 @@ pub struct VideoGrant<'a> {
 }
 
 impl<'a> VideoGrant<'a> {
+    pub fn to_admin(room: &'a str) -> Self {
+        Self {
+            room_admin: Some(true),
+            room: Some(Cow::Borrowed(room)),
+            ..Default::default()
+        }
+    }
+
     pub fn to_join(room: &'a str) -> Self {
         Self {
-            room: Some(room),
+            room: Some(Cow::Borrowed(room)),
             room_join: Some(true),
             can_publish: Some(true),
             can_subscribe: Some(true),
@@ -67,8 +76,8 @@ pub fn create(
     let now = SystemTime::now();
 
     let claims = ClaimGrants {
-        iss: api_key,
-        sub: identity,
+        iss: Cow::Borrowed(api_key),
+        sub: identity.map(Cow::Borrowed),
         iat: now.duration_since(UNIX_EPOCH).unwrap().as_secs(),
         exp: now
             .add(DEFAULT_TTL)
@@ -76,8 +85,13 @@ pub fn create(
             .unwrap()
             .as_secs(),
         nbf: 0,
-        jwtid: identity,
+        jwtid: identity.map(Cow::Borrowed),
         video: video_grant,
     };
     Ok(claims.sign_with_key(&secret_key)?)
 }
+
+pub fn validate<'a>(token: &'a str, secret_key: &str) -> Result<ClaimGrants<'a>> {
+    let secret_key: Hmac<Sha256> = Hmac::new_from_slice(secret_key.as_bytes())?;
+    Ok(token.verify_with_key(&secret_key)?)
+}