Use `live_kit_client::TestServer` in integration tests

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

Cargo.lock                                  |   3 
crates/collab/Cargo.toml                    |   4 
crates/collab/src/integration_tests.rs      |  24 ++
crates/collab/src/main.rs                   |   6 
crates/live_kit_client/Cargo.toml           |  12 +
crates/live_kit_client/examples/test_app.rs |   5 
crates/live_kit_client/src/test.rs          | 171 ++++++++++++++++++++--
crates/live_kit_server/Cargo.toml           |   1 
crates/live_kit_server/src/api.rs           | 137 +++++++++--------
9 files changed, 269 insertions(+), 94 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1066,6 +1066,7 @@ dependencies = [
  "language",
  "lazy_static",
  "lipsum",
+ "live_kit_client",
  "live_kit_server",
  "log",
  "lsp",
@@ -3163,6 +3164,7 @@ name = "live_kit_client"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-trait",
  "block",
  "byteorder",
  "bytes 1.2.1",
@@ -3193,6 +3195,7 @@ name = "live_kit_server"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-trait",
  "futures 0.3.24",
  "hmac 0.12.1",
  "jwt",

crates/collab/Cargo.toml 🔗

@@ -62,15 +62,17 @@ editor = { path = "../editor", features = ["test-support"] }
 language = { path = "../language", features = ["test-support"] }
 fs = { path = "../fs", features = ["test-support"] }
 git = { path = "../git", features = ["test-support"] }
-log = { version = "0.4.16", features = ["kv_unstable_serde"] }
+live_kit_client = { path = "../live_kit_client", features = ["test-support"] }
 lsp = { path = "../lsp", features = ["test-support"] }
 project = { path = "../project", features = ["test-support"] }
 rpc = { path = "../rpc", features = ["test-support"] }
 settings = { path = "../settings", features = ["test-support"] }
 theme = { path = "../theme" }
 workspace = { path = "../workspace", features = ["test-support"] }
+
 ctor = "0.1"
 env_logger = "0.9"
+log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 util = { path = "../util" }
 lazy_static = "1.4"
 serde_json = { version = "1.0", features = ["preserve_order"] }

crates/collab/src/integration_tests.rs 🔗

@@ -47,7 +47,7 @@ use std::{
     path::{Path, PathBuf},
     rc::Rc,
     sync::{
-        atomic::{AtomicBool, Ordering::SeqCst},
+        atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
         Arc,
     },
     time::Duration,
@@ -6138,6 +6138,7 @@ struct TestServer {
     connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
     forbid_connections: Arc<AtomicBool>,
     _test_db: TestDb,
+    test_live_kit_server: Arc<live_kit_client::TestServer>,
 }
 
 impl TestServer {
@@ -6145,8 +6146,18 @@ impl TestServer {
         foreground: Rc<executor::Foreground>,
         background: Arc<executor::Background>,
     ) -> Self {
+        static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
+
         let test_db = TestDb::fake(background.clone());
-        let app_state = Self::build_app_state(&test_db).await;
+        let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
+        let live_kit_server = live_kit_client::TestServer::create(
+            format!("http://livekit.{}.test", live_kit_server_id),
+            format!("devkey-{}", live_kit_server_id),
+            format!("secret-{}", live_kit_server_id),
+            background.clone(),
+        )
+        .unwrap();
+        let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
         let peer = Peer::new();
         let notifications = mpsc::unbounded();
         let server = Server::new(app_state.clone(), Some(notifications.0));
@@ -6159,6 +6170,7 @@ impl TestServer {
             connection_killers: Default::default(),
             forbid_connections: Default::default(),
             _test_db: test_db,
+            test_live_kit_server: live_kit_server,
         }
     }
 
@@ -6354,10 +6366,13 @@ impl TestServer {
         }
     }
 
-    async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
+    async fn build_app_state(
+        test_db: &TestDb,
+        fake_server: &live_kit_client::TestServer,
+    ) -> Arc<AppState> {
         Arc::new(AppState {
             db: test_db.db().clone(),
-            live_kit_client: None,
+            live_kit_client: Some(Arc::new(fake_server.create_api_client())),
             api_token: Default::default(),
             invite_link_prefix: Default::default(),
         })
@@ -6390,6 +6405,7 @@ impl Deref for TestServer {
 impl Drop for TestServer {
     fn drop(&mut self) {
         self.peer.reset();
+        self.test_live_kit_server.teardown().unwrap();
     }
 }
 

crates/collab/src/main.rs 🔗

@@ -41,7 +41,7 @@ pub struct AppState {
     db: Arc<dyn Db>,
     api_token: String,
     invite_link_prefix: String,
-    live_kit_client: Option<live_kit_server::api::Client>,
+    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
 }
 
 impl AppState {
@@ -53,11 +53,11 @@ impl AppState {
             .zip(config.live_kit_key.as_ref())
             .zip(config.live_kit_secret.as_ref())
         {
-            Some(live_kit_server::api::Client::new(
+            Some(Arc::new(live_kit_server::api::LiveKitClient::new(
                 server.clone(),
                 key.clone(),
                 secret.clone(),
-            ))
+            )) as Arc<dyn live_kit_server::api::Client>)
         } else {
             None
         };

crates/live_kit_client/Cargo.toml 🔗

@@ -12,7 +12,13 @@ doctest = false
 name = "test_app"
 
 [features]
-test-support = ["collections/test-support", "gpui/test-support", "lazy_static", "live_kit_server"]
+test-support = [
+    "async-trait", 
+    "collections/test-support",
+    "gpui/test-support",
+    "lazy_static",
+    "live_kit_server"
+]
 
 [dependencies]
 collections = { path = "../collections", optional = true }
@@ -21,6 +27,7 @@ live_kit_server = { path = "../live_kit_server", optional = true }
 media = { path = "../media" }
 
 anyhow = "1.0.38"
+async-trait = { version = "0.1", optional = true }
 core-foundation = "0.9.3"
 core-graphics = "0.22.3"
 futures = "0.3"
@@ -30,11 +37,11 @@ parking_lot = "0.11.1"
 [dev-dependencies]
 collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
-lazy_static = "1.4"
 live_kit_server = { path = "../live_kit_server" }
 media = { path = "../media" }
 
 anyhow = "1.0.38"
+async-trait = "0.1"
 block = "0.1"
 bytes = "1.2"
 byteorder = "1.4"
@@ -45,6 +52,7 @@ foreign-types = "0.3"
 futures = "0.3"
 hmac = "0.12"
 jwt = "0.16"
+lazy_static = "1.4"
 log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 objc = "0.2"
 parking_lot = "0.11.1"

crates/live_kit_client/examples/test_app.rs 🔗

@@ -7,7 +7,10 @@ use gpui::{
     Menu, MenuItem, ViewContext,
 };
 use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room};
-use live_kit_server::token::{self, VideoGrant};
+use live_kit_server::{
+    api::Client,
+    token::{self, VideoGrant},
+};
 use log::LevelFilter;
 use media::core_video::CVImageBuffer;
 use postage::watch;

crates/live_kit_client/src/test.rs 🔗

@@ -1,35 +1,40 @@
 use anyhow::{anyhow, Result};
+use async_trait::async_trait;
 use collections::HashMap;
 use futures::{channel::mpsc, future};
 use gpui::executor::Background;
 use lazy_static::lazy_static;
+use live_kit_server::token;
 use media::core_video::CVImageBuffer;
 use parking_lot::Mutex;
 use std::{future::Future, sync::Arc};
 
 lazy_static! {
-    static ref SERVERS: Mutex<HashMap<String, Arc<FakeServer>>> = Default::default();
+    static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
 }
 
-pub struct FakeServer {
-    url: String,
-    secret_key: String,
-    rooms: Mutex<HashMap<String, FakeServerRoom>>,
+pub struct TestServer {
+    pub url: String,
+    pub api_key: String,
+    pub secret_key: String,
+    rooms: Mutex<HashMap<String, TestServerRoom>>,
     background: Arc<Background>,
 }
 
-impl FakeServer {
+impl TestServer {
     pub fn create(
         url: String,
+        api_key: String,
         secret_key: String,
         background: Arc<Background>,
-    ) -> Result<Arc<FakeServer>> {
+    ) -> Result<Arc<TestServer>> {
         let mut servers = SERVERS.lock();
         if servers.contains_key(&url) {
             Err(anyhow!("a server with url {:?} already exists", url))
         } else {
-            let server = Arc::new(FakeServer {
+            let server = Arc::new(TestServer {
                 url: url.clone(),
+                api_key,
                 secret_key,
                 rooms: Default::default(),
                 background,
@@ -39,7 +44,7 @@ impl FakeServer {
         }
     }
 
-    fn get(url: &str) -> Result<Arc<FakeServer>> {
+    fn get(url: &str) -> Result<Arc<TestServer>> {
         Ok(SERVERS
             .lock()
             .get(url)
@@ -55,33 +60,151 @@ impl FakeServer {
         Ok(())
     }
 
+    pub fn create_api_client(&self) -> TestApiClient {
+        TestApiClient {
+            url: self.url.clone(),
+        }
+    }
+
+    async fn create_room(&self, room: String) -> Result<()> {
+        self.background.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        if server_rooms.contains_key(&room) {
+            Err(anyhow!("room {:?} already exists", room))
+        } else {
+            server_rooms.insert(room, Default::default());
+            Ok(())
+        }
+    }
+
+    async fn delete_room(&self, room: String) -> Result<()> {
+        // TODO: clear state associated with all `Room`s.
+        self.background.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        server_rooms
+            .remove(&room)
+            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
+        Ok(())
+    }
+
     async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
         self.background.simulate_random_delay().await;
         let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
         let identity = claims.sub.unwrap().to_string();
-        let room = claims.video.room.unwrap();
+        let room_name = claims.video.room.unwrap();
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
+        if room.clients.contains_key(&identity) {
+            Err(anyhow!(
+                "{:?} attempted to join room {:?} twice",
+                identity,
+                room_name
+            ))
+        } else {
+            room.clients.insert(identity, client_room);
+            Ok(())
+        }
+    }
+
+    async fn leave_room(&self, token: String) -> Result<()> {
+        self.background.simulate_random_delay().await;
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let identity = claims.sub.unwrap().to_string();
+        let room_name = claims.video.room.unwrap();
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        room.clients.remove(&identity).ok_or_else(|| {
+            anyhow!(
+                "{:?} attempted to leave room {:?} before joining it",
+                identity,
+                room_name
+            )
+        })?;
+        Ok(())
+    }
+
+    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
+        // TODO: clear state associated with the `Room`.
+
+        self.background.simulate_random_delay().await;
         let mut server_rooms = self.rooms.lock();
         let room = server_rooms
-            .get_mut(&*room)
-            .ok_or_else(|| anyhow!("room {} does not exist", room))?;
-        room.clients.insert(identity, client_room);
+            .get_mut(&room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        room.clients.remove(&identity).ok_or_else(|| {
+            anyhow!(
+                "participant {:?} did not join room {:?}",
+                identity,
+                room_name
+            )
+        })?;
         Ok(())
     }
 }
 
-struct FakeServerRoom {
+#[derive(Default)]
+struct TestServerRoom {
     clients: HashMap<Sid, Arc<Room>>,
 }
 
-impl FakeServerRoom {}
+impl TestServerRoom {}
+
+pub struct TestApiClient {
+    url: String,
+}
+
+#[async_trait]
+impl live_kit_server::api::Client for TestApiClient {
+    fn url(&self) -> &str {
+        &self.url
+    }
+
+    async fn create_room(&self, name: String) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server.create_room(name).await?;
+        Ok(())
+    }
+
+    async fn delete_room(&self, name: String) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server.delete_room(name).await?;
+        Ok(())
+    }
+
+    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server.remove_participant(room, identity).await?;
+        Ok(())
+    }
+
+    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
+        let server = TestServer::get(&self.url)?;
+        token::create(
+            &server.api_key,
+            &server.secret_key,
+            Some(identity),
+            token::VideoGrant::to_join(room),
+        )
+    }
+}
 
 pub type Sid = String;
 
-pub struct Room;
+#[derive(Default)]
+struct RoomState {
+    token: Option<String>,
+}
+
+#[derive(Default)]
+pub struct Room(Mutex<RoomState>);
 
 impl Room {
     pub fn new() -> Arc<Self> {
-        Arc::new(Self)
+        Default::default()
     }
 
     pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
@@ -89,8 +212,9 @@ impl Room {
         let url = url.to_string();
         let token = token.to_string();
         async move {
-            let server = FakeServer::get(&url)?;
-            server.join_room(token, this).await?;
+            let server = TestServer::get(&url)?;
+            server.join_room(token.clone(), this.clone()).await?;
+            this.0.lock().token = Some(token);
             Ok(())
         }
     }
@@ -115,7 +239,14 @@ impl Room {
 
 impl Drop for Room {
     fn drop(&mut self) {
-        todo!()
+        if let Some(token) = self.0.lock().token.take() {
+            if let Ok(server) = TestServer::get(&token) {
+                let background = server.background.clone();
+                background
+                    .spawn(async move { server.leave_room(token).await.unwrap() })
+                    .detach();
+            }
+        }
     }
 }
 

crates/live_kit_server/src/api.rs 🔗

@@ -1,18 +1,28 @@
 use crate::{proto, token};
 use anyhow::{anyhow, Result};
+use async_trait::async_trait;
 use prost::Message;
 use reqwest::header::CONTENT_TYPE;
 use std::{future::Future, sync::Arc};
 
+#[async_trait]
+pub trait Client: Send + Sync {
+    fn url(&self) -> &str;
+    async fn create_room(&self, name: String) -> Result<()>;
+    async fn delete_room(&self, name: String) -> Result<()>;
+    async fn remove_participant(&self, room: String, identity: String) -> Result<()>;
+    fn room_token(&self, room: &str, identity: &str) -> Result<String>;
+}
+
 #[derive(Clone)]
-pub struct Client {
+pub struct LiveKitClient {
     http: reqwest::Client,
     url: Arc<str>,
     key: Arc<str>,
     secret: Arc<str>,
 }
 
-impl Client {
+impl LiveKitClient {
     pub fn new(mut url: String, key: String, secret: String) -> Self {
         if url.ends_with('/') {
             url.pop();
@@ -26,67 +36,6 @@ impl Client {
         }
     }
 
-    pub fn url(&self) -> &str {
-        &self.url
-    }
-
-    pub fn create_room(&self, name: String) -> impl Future<Output = Result<proto::Room>> {
-        self.request(
-            "twirp/livekit.RoomService/CreateRoom",
-            token::VideoGrant {
-                room_create: Some(true),
-                ..Default::default()
-            },
-            proto::CreateRoomRequest {
-                name,
-                ..Default::default()
-            },
-        )
-    }
-
-    pub fn delete_room(&self, name: String) -> impl Future<Output = Result<()>> {
-        let response = self.request(
-            "twirp/livekit.RoomService/DeleteRoom",
-            token::VideoGrant {
-                room_create: Some(true),
-                ..Default::default()
-            },
-            proto::DeleteRoomRequest { room: name },
-        );
-        async move {
-            let _: proto::DeleteRoomResponse = response.await?;
-            Ok(())
-        }
-    }
-
-    pub fn remove_participant(
-        &self,
-        room: String,
-        identity: String,
-    ) -> impl Future<Output = Result<()>> {
-        let response = self.request(
-            "twirp/livekit.RoomService/RemoveParticipant",
-            token::VideoGrant::to_admin(&room),
-            proto::RoomParticipantIdentity {
-                room: room.clone(),
-                identity,
-            },
-        );
-        async move {
-            let _: proto::RemoveParticipantResponse = response.await?;
-            Ok(())
-        }
-    }
-
-    pub fn room_token(&self, room: &str, identity: &str) -> Result<String> {
-        token::create(
-            &self.key,
-            &self.secret,
-            Some(identity),
-            token::VideoGrant::to_join(room),
-        )
-    }
-
     fn request<Req, Res>(
         &self,
         path: &str,
@@ -126,3 +75,65 @@ impl Client {
         }
     }
 }
+
+#[async_trait]
+impl Client for LiveKitClient {
+    fn url(&self) -> &str {
+        &self.url
+    }
+
+    async fn create_room(&self, name: String) -> Result<()> {
+        let x: proto::Room = self
+            .request(
+                "twirp/livekit.RoomService/CreateRoom",
+                token::VideoGrant {
+                    room_create: Some(true),
+                    ..Default::default()
+                },
+                proto::CreateRoomRequest {
+                    name,
+                    ..Default::default()
+                },
+            )
+            .await?;
+        dbg!(x);
+        Ok(())
+    }
+
+    async fn delete_room(&self, name: String) -> Result<()> {
+        let _: proto::DeleteRoomResponse = self
+            .request(
+                "twirp/livekit.RoomService/DeleteRoom",
+                token::VideoGrant {
+                    room_create: Some(true),
+                    ..Default::default()
+                },
+                proto::DeleteRoomRequest { room: name },
+            )
+            .await?;
+        Ok(())
+    }
+
+    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
+        let _: proto::RemoveParticipantResponse = self
+            .request(
+                "twirp/livekit.RoomService/RemoveParticipant",
+                token::VideoGrant::to_admin(&room),
+                proto::RoomParticipantIdentity {
+                    room: room.clone(),
+                    identity,
+                },
+            )
+            .await?;
+        Ok(())
+    }
+
+    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
+        token::create(
+            &self.key,
+            &self.secret,
+            Some(identity),
+            token::VideoGrant::to_join(room),
+        )
+    }
+}