test.rs

  1use anyhow::{anyhow, Result};
  2use async_trait::async_trait;
  3use collections::HashMap;
  4use futures::{Stream, StreamExt};
  5use gpui::executor::{self, Background};
  6use lazy_static::lazy_static;
  7use live_kit_server::token;
  8use media::core_video::CVImageBuffer;
  9use parking_lot::Mutex;
 10use std::{future::Future, sync::Arc};
 11
 12lazy_static! {
 13    static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
 14}
 15
 16pub struct TestServer {
 17    pub url: String,
 18    pub api_key: String,
 19    pub secret_key: String,
 20    rooms: Mutex<HashMap<String, TestServerRoom>>,
 21    background: Arc<Background>,
 22}
 23
 24impl TestServer {
 25    pub fn create(
 26        url: String,
 27        api_key: String,
 28        secret_key: String,
 29        background: Arc<Background>,
 30    ) -> Result<Arc<TestServer>> {
 31        let mut servers = SERVERS.lock();
 32        if servers.contains_key(&url) {
 33            Err(anyhow!("a server with url {:?} already exists", url))
 34        } else {
 35            let server = Arc::new(TestServer {
 36                url: url.clone(),
 37                api_key,
 38                secret_key,
 39                rooms: Default::default(),
 40                background,
 41            });
 42            servers.insert(url, server.clone());
 43            Ok(server)
 44        }
 45    }
 46
 47    fn get(url: &str) -> Result<Arc<TestServer>> {
 48        Ok(SERVERS
 49            .lock()
 50            .get(url)
 51            .ok_or_else(|| anyhow!("no server found for url"))?
 52            .clone())
 53    }
 54
 55    pub fn teardown(&self) -> Result<()> {
 56        SERVERS
 57            .lock()
 58            .remove(&self.url)
 59            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 60        Ok(())
 61    }
 62
 63    pub fn create_api_client(&self) -> TestApiClient {
 64        TestApiClient {
 65            url: self.url.clone(),
 66        }
 67    }
 68
 69    async fn create_room(&self, room: String) -> Result<()> {
 70        self.background.simulate_random_delay().await;
 71        let mut server_rooms = self.rooms.lock();
 72        if server_rooms.contains_key(&room) {
 73            Err(anyhow!("room {:?} already exists", room))
 74        } else {
 75            server_rooms.insert(room, Default::default());
 76            Ok(())
 77        }
 78    }
 79
 80    async fn delete_room(&self, room: String) -> Result<()> {
 81        // TODO: clear state associated with all `Room`s.
 82        self.background.simulate_random_delay().await;
 83        let mut server_rooms = self.rooms.lock();
 84        server_rooms
 85            .remove(&room)
 86            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 87        Ok(())
 88    }
 89
 90    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 91        self.background.simulate_random_delay().await;
 92        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 93        let identity = claims.sub.unwrap().to_string();
 94        let room_name = claims.video.room.unwrap();
 95        let mut server_rooms = self.rooms.lock();
 96        let room = server_rooms
 97            .get_mut(&*room_name)
 98            .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
 99        if room.client_rooms.contains_key(&identity) {
100            Err(anyhow!(
101                "{:?} attempted to join room {:?} twice",
102                identity,
103                room_name
104            ))
105        } else {
106            room.client_rooms.insert(identity, client_room);
107            Ok(())
108        }
109    }
110
111    async fn leave_room(&self, token: String) -> Result<()> {
112        self.background.simulate_random_delay().await;
113        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
114        let identity = claims.sub.unwrap().to_string();
115        let room_name = claims.video.room.unwrap();
116        let mut server_rooms = self.rooms.lock();
117        let room = server_rooms
118            .get_mut(&*room_name)
119            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
120        room.client_rooms.remove(&identity).ok_or_else(|| {
121            anyhow!(
122                "{:?} attempted to leave room {:?} before joining it",
123                identity,
124                room_name
125            )
126        })?;
127        Ok(())
128    }
129
130    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
131        // TODO: clear state associated with the `Room`.
132
133        self.background.simulate_random_delay().await;
134        let mut server_rooms = self.rooms.lock();
135        let room = server_rooms
136            .get_mut(&room_name)
137            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
138        room.client_rooms.remove(&identity).ok_or_else(|| {
139            anyhow!(
140                "participant {:?} did not join room {:?}",
141                identity,
142                room_name
143            )
144        })?;
145        Ok(())
146    }
147
148    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
149        self.background.simulate_random_delay().await;
150        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
151        let identity = claims.sub.unwrap().to_string();
152        let room_name = claims.video.room.unwrap();
153
154        let mut server_rooms = self.rooms.lock();
155        let room = server_rooms
156            .get_mut(&*room_name)
157            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
158
159        let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
160            sid: nanoid::nanoid!(17),
161            publisher_id: identity.clone(),
162            frames_rx: local_track.frames_rx.clone(),
163            background: self.background.clone(),
164        }));
165
166        for (id, client_room) in &room.client_rooms {
167            if *id != identity {
168                let _ = client_room
169                    .0
170                    .lock()
171                    .video_track_updates
172                    .0
173                    .try_broadcast(update.clone())
174                    .unwrap();
175            }
176        }
177
178        Ok(())
179    }
180}
181
182#[derive(Default)]
183struct TestServerRoom {
184    client_rooms: HashMap<Sid, Arc<Room>>,
185}
186
187impl TestServerRoom {}
188
189pub struct TestApiClient {
190    url: String,
191}
192
193#[async_trait]
194impl live_kit_server::api::Client for TestApiClient {
195    fn url(&self) -> &str {
196        &self.url
197    }
198
199    async fn create_room(&self, name: String) -> Result<()> {
200        let server = TestServer::get(&self.url)?;
201        server.create_room(name).await?;
202        Ok(())
203    }
204
205    async fn delete_room(&self, name: String) -> Result<()> {
206        let server = TestServer::get(&self.url)?;
207        server.delete_room(name).await?;
208        Ok(())
209    }
210
211    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
212        let server = TestServer::get(&self.url)?;
213        server.remove_participant(room, identity).await?;
214        Ok(())
215    }
216
217    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
218        let server = TestServer::get(&self.url)?;
219        token::create(
220            &server.api_key,
221            &server.secret_key,
222            Some(identity),
223            token::VideoGrant::to_join(room),
224        )
225    }
226}
227
228pub type Sid = String;
229
230struct RoomState {
231    connection: Option<ConnectionState>,
232    display_sources: Vec<MacOSDisplay>,
233    video_track_updates: (
234        async_broadcast::Sender<RemoteVideoTrackUpdate>,
235        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
236    ),
237}
238
239struct ConnectionState {
240    url: String,
241    token: String,
242}
243
244pub struct Room(Mutex<RoomState>);
245
246impl Room {
247    pub fn new() -> Arc<Self> {
248        Arc::new(Self(Mutex::new(RoomState {
249            connection: None,
250            display_sources: Default::default(),
251            video_track_updates: async_broadcast::broadcast(128),
252        })))
253    }
254
255    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
256        let this = self.clone();
257        let url = url.to_string();
258        let token = token.to_string();
259        async move {
260            let server = TestServer::get(&url)?;
261            server.join_room(token.clone(), this.clone()).await?;
262            this.0.lock().connection = Some(ConnectionState { url, token });
263            Ok(())
264        }
265    }
266
267    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
268        let this = self.clone();
269        async move {
270            let server = this.test_server();
271            server.background.simulate_random_delay().await;
272            Ok(this.0.lock().display_sources.clone())
273        }
274    }
275
276    pub fn publish_video_track(
277        self: &Arc<Self>,
278        track: &LocalVideoTrack,
279    ) -> impl Future<Output = Result<LocalTrackPublication>> {
280        let this = self.clone();
281        let track = track.clone();
282        async move {
283            this.test_server()
284                .publish_video_track(this.token(), track)
285                .await?;
286            Ok(LocalTrackPublication)
287        }
288    }
289
290    pub fn unpublish_track(&self, _: LocalTrackPublication) {}
291
292    pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
293        Default::default()
294    }
295
296    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
297        self.0.lock().video_track_updates.1.clone()
298    }
299
300    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
301        self.0.lock().display_sources = sources;
302    }
303
304    fn test_server(&self) -> Arc<TestServer> {
305        let this = self.0.lock();
306        let connection = this
307            .connection
308            .as_ref()
309            .expect("must be connected to call this method");
310        TestServer::get(&connection.url).unwrap()
311    }
312
313    fn token(&self) -> String {
314        self.0
315            .lock()
316            .connection
317            .as_ref()
318            .expect("must be connected to call this method")
319            .token
320            .clone()
321    }
322}
323
324impl Drop for Room {
325    fn drop(&mut self) {
326        if let Some(connection) = self.0.lock().connection.take() {
327            if let Ok(server) = TestServer::get(&connection.token) {
328                let background = server.background.clone();
329                background
330                    .spawn(async move { server.leave_room(connection.token).await.unwrap() })
331                    .detach();
332            }
333        }
334    }
335}
336
337pub struct LocalTrackPublication;
338
339#[derive(Clone)]
340pub struct LocalVideoTrack {
341    frames_rx: async_broadcast::Receiver<Frame>,
342}
343
344impl LocalVideoTrack {
345    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
346        Self {
347            frames_rx: display.frames.1.clone(),
348        }
349    }
350}
351
352pub struct RemoteVideoTrack {
353    sid: Sid,
354    publisher_id: Sid,
355    frames_rx: async_broadcast::Receiver<Frame>,
356    background: Arc<executor::Background>,
357}
358
359impl RemoteVideoTrack {
360    pub fn sid(&self) -> &str {
361        &self.sid
362    }
363
364    pub fn publisher_id(&self) -> &str {
365        &self.publisher_id
366    }
367
368    pub fn add_renderer<F>(&self, mut callback: F)
369    where
370        F: 'static + Send + Sync + FnMut(Frame),
371    {
372        let mut frames_rx = self.frames_rx.clone();
373        self.background
374            .spawn(async move {
375                while let Some(frame) = frames_rx.next().await {
376                    callback(frame)
377                }
378            })
379            .detach();
380    }
381}
382
383#[derive(Clone)]
384pub enum RemoteVideoTrackUpdate {
385    Subscribed(Arc<RemoteVideoTrack>),
386    Unsubscribed { publisher_id: Sid, track_id: Sid },
387}
388
389#[derive(Clone)]
390pub struct MacOSDisplay {
391    frames: (
392        async_broadcast::Sender<Frame>,
393        async_broadcast::Receiver<Frame>,
394    ),
395}
396
397impl MacOSDisplay {
398    pub fn new() -> Self {
399        Self {
400            frames: async_broadcast::broadcast(128),
401        }
402    }
403
404    pub fn send_frame(&self, frame: Frame) {
405        self.frames.0.try_broadcast(frame).unwrap();
406    }
407}
408
409#[derive(Clone, Debug, PartialEq, Eq)]
410pub struct Frame {
411    pub label: String,
412    pub width: usize,
413    pub height: usize,
414}
415
416impl Frame {
417    pub fn width(&self) -> usize {
418        self.width
419    }
420
421    pub fn height(&self) -> usize {
422        self.height
423    }
424
425    pub fn image(&self) -> CVImageBuffer {
426        unimplemented!("you can't call this in test mode")
427    }
428}