test.rs

  1use anyhow::{anyhow, Result};
  2use async_trait::async_trait;
  3use collections::HashMap;
  4use futures::Stream;
  5use gpui::executor::Background;
  6use lazy_static::lazy_static;
  7use live_kit_server::token;
  8use media::core_video::CVImageBuffer;
  9use parking_lot::Mutex;
 10use postage::watch;
 11use std::{future::Future, mem, sync::Arc};
 12
 13lazy_static! {
 14    static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
 15}
 16
 17pub struct TestServer {
 18    pub url: String,
 19    pub api_key: String,
 20    pub secret_key: String,
 21    rooms: Mutex<HashMap<String, TestServerRoom>>,
 22    background: Arc<Background>,
 23}
 24
 25impl TestServer {
 26    pub fn create(
 27        url: String,
 28        api_key: String,
 29        secret_key: String,
 30        background: Arc<Background>,
 31    ) -> Result<Arc<TestServer>> {
 32        let mut servers = SERVERS.lock();
 33        if servers.contains_key(&url) {
 34            Err(anyhow!("a server with url {:?} already exists", url))
 35        } else {
 36            let server = Arc::new(TestServer {
 37                url: url.clone(),
 38                api_key,
 39                secret_key,
 40                rooms: Default::default(),
 41                background,
 42            });
 43            servers.insert(url, server.clone());
 44            Ok(server)
 45        }
 46    }
 47
 48    fn get(url: &str) -> Result<Arc<TestServer>> {
 49        Ok(SERVERS
 50            .lock()
 51            .get(url)
 52            .ok_or_else(|| anyhow!("no server found for url"))?
 53            .clone())
 54    }
 55
 56    pub fn teardown(&self) -> Result<()> {
 57        SERVERS
 58            .lock()
 59            .remove(&self.url)
 60            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 61        Ok(())
 62    }
 63
 64    pub fn create_api_client(&self) -> TestApiClient {
 65        TestApiClient {
 66            url: self.url.clone(),
 67        }
 68    }
 69
 70    async fn create_room(&self, room: String) -> Result<()> {
 71        self.background.simulate_random_delay().await;
 72        let mut server_rooms = self.rooms.lock();
 73        if server_rooms.contains_key(&room) {
 74            Err(anyhow!("room {:?} already exists", room))
 75        } else {
 76            server_rooms.insert(room, Default::default());
 77            Ok(())
 78        }
 79    }
 80
 81    async fn delete_room(&self, room: String) -> Result<()> {
 82        // TODO: clear state associated with all `Room`s.
 83        self.background.simulate_random_delay().await;
 84        let mut server_rooms = self.rooms.lock();
 85        server_rooms
 86            .remove(&room)
 87            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 88        Ok(())
 89    }
 90
 91    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 92        self.background.simulate_random_delay().await;
 93        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 94        let identity = claims.sub.unwrap().to_string();
 95        let room_name = claims.video.room.unwrap();
 96        let mut server_rooms = self.rooms.lock();
 97        let room = server_rooms
 98            .get_mut(&*room_name)
 99            .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
100        if room.client_rooms.contains_key(&identity) {
101            Err(anyhow!(
102                "{:?} attempted to join room {:?} twice",
103                identity,
104                room_name
105            ))
106        } else {
107            room.client_rooms.insert(identity, client_room);
108            Ok(())
109        }
110    }
111
112    async fn leave_room(&self, token: String) -> Result<()> {
113        self.background.simulate_random_delay().await;
114        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
115        let identity = claims.sub.unwrap().to_string();
116        let room_name = claims.video.room.unwrap();
117        let mut server_rooms = self.rooms.lock();
118        let room = server_rooms
119            .get_mut(&*room_name)
120            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
121        room.client_rooms.remove(&identity).ok_or_else(|| {
122            anyhow!(
123                "{:?} attempted to leave room {:?} before joining it",
124                identity,
125                room_name
126            )
127        })?;
128        Ok(())
129    }
130
131    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
132        // TODO: clear state associated with the `Room`.
133
134        self.background.simulate_random_delay().await;
135        let mut server_rooms = self.rooms.lock();
136        let room = server_rooms
137            .get_mut(&room_name)
138            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
139        room.client_rooms.remove(&identity).ok_or_else(|| {
140            anyhow!(
141                "participant {:?} did not join room {:?}",
142                identity,
143                room_name
144            )
145        })?;
146        Ok(())
147    }
148
149    pub async fn disconnect_client(&self, client_identity: String) {
150        self.background.simulate_random_delay().await;
151        let mut server_rooms = self.rooms.lock();
152        for room in server_rooms.values_mut() {
153            if let Some(room) = room.client_rooms.remove(&client_identity) {
154                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
155            }
156        }
157    }
158
159    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
160        self.background.simulate_random_delay().await;
161        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
162        let identity = claims.sub.unwrap().to_string();
163        let room_name = claims.video.room.unwrap();
164
165        let mut server_rooms = self.rooms.lock();
166        let room = server_rooms
167            .get_mut(&*room_name)
168            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
169
170        let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
171            sid: nanoid::nanoid!(17),
172            publisher_id: identity.clone(),
173            frames_rx: local_track.frames_rx.clone(),
174        }));
175
176        for (id, client_room) in &room.client_rooms {
177            if *id != identity {
178                let _ = client_room
179                    .0
180                    .lock()
181                    .video_track_updates
182                    .0
183                    .try_broadcast(update.clone())
184                    .unwrap();
185            }
186        }
187
188        Ok(())
189    }
190}
191
192#[derive(Default)]
193struct TestServerRoom {
194    client_rooms: HashMap<Sid, Arc<Room>>,
195}
196
197impl TestServerRoom {}
198
199pub struct TestApiClient {
200    url: String,
201}
202
203#[async_trait]
204impl live_kit_server::api::Client for TestApiClient {
205    fn url(&self) -> &str {
206        &self.url
207    }
208
209    async fn create_room(&self, name: String) -> Result<()> {
210        let server = TestServer::get(&self.url)?;
211        server.create_room(name).await?;
212        Ok(())
213    }
214
215    async fn delete_room(&self, name: String) -> Result<()> {
216        let server = TestServer::get(&self.url)?;
217        server.delete_room(name).await?;
218        Ok(())
219    }
220
221    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
222        let server = TestServer::get(&self.url)?;
223        server.remove_participant(room, identity).await?;
224        Ok(())
225    }
226
227    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
228        let server = TestServer::get(&self.url)?;
229        token::create(
230            &server.api_key,
231            &server.secret_key,
232            Some(identity),
233            token::VideoGrant::to_join(room),
234        )
235    }
236}
237
238pub type Sid = String;
239
240struct RoomState {
241    connection: (
242        watch::Sender<ConnectionState>,
243        watch::Receiver<ConnectionState>,
244    ),
245    display_sources: Vec<MacOSDisplay>,
246    video_track_updates: (
247        async_broadcast::Sender<RemoteVideoTrackUpdate>,
248        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
249    ),
250}
251
252#[derive(Clone, Eq, PartialEq)]
253pub enum ConnectionState {
254    Disconnected,
255    Connected { url: String, token: String },
256}
257
258pub struct Room(Mutex<RoomState>);
259
260impl Room {
261    pub fn new() -> Arc<Self> {
262        Arc::new(Self(Mutex::new(RoomState {
263            connection: watch::channel_with(ConnectionState::Disconnected),
264            display_sources: Default::default(),
265            video_track_updates: async_broadcast::broadcast(128),
266        })))
267    }
268
269    pub fn status(&self) -> watch::Receiver<ConnectionState> {
270        self.0.lock().connection.1.clone()
271    }
272
273    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
274        let this = self.clone();
275        let url = url.to_string();
276        let token = token.to_string();
277        async move {
278            let server = TestServer::get(&url)?;
279            server.join_room(token.clone(), this.clone()).await?;
280            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
281            Ok(())
282        }
283    }
284
285    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
286        let this = self.clone();
287        async move {
288            let server = this.test_server();
289            server.background.simulate_random_delay().await;
290            Ok(this.0.lock().display_sources.clone())
291        }
292    }
293
294    pub fn publish_video_track(
295        self: &Arc<Self>,
296        track: &LocalVideoTrack,
297    ) -> impl Future<Output = Result<LocalTrackPublication>> {
298        let this = self.clone();
299        let track = track.clone();
300        async move {
301            this.test_server()
302                .publish_video_track(this.token(), track)
303                .await?;
304            Ok(LocalTrackPublication)
305        }
306    }
307
308    pub fn unpublish_track(&self, _: LocalTrackPublication) {}
309
310    pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
311        Default::default()
312    }
313
314    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
315        self.0.lock().video_track_updates.1.clone()
316    }
317
318    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
319        self.0.lock().display_sources = sources;
320    }
321
322    fn test_server(&self) -> Arc<TestServer> {
323        match self.0.lock().connection.1.borrow().clone() {
324            ConnectionState::Disconnected => panic!("must be connected to call this method"),
325            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
326        }
327    }
328
329    fn token(&self) -> String {
330        match self.0.lock().connection.1.borrow().clone() {
331            ConnectionState::Disconnected => panic!("must be connected to call this method"),
332            ConnectionState::Connected { token, .. } => token,
333        }
334    }
335}
336
337impl Drop for Room {
338    fn drop(&mut self) {
339        if let ConnectionState::Connected { token, .. } = mem::replace(
340            &mut *self.0.lock().connection.0.borrow_mut(),
341            ConnectionState::Disconnected,
342        ) {
343            if let Ok(server) = TestServer::get(&token) {
344                let background = server.background.clone();
345                background
346                    .spawn(async move { server.leave_room(token).await.unwrap() })
347                    .detach();
348            }
349        }
350    }
351}
352
353pub struct LocalTrackPublication;
354
355#[derive(Clone)]
356pub struct LocalVideoTrack {
357    frames_rx: async_broadcast::Receiver<Frame>,
358}
359
360impl LocalVideoTrack {
361    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
362        Self {
363            frames_rx: display.frames.1.clone(),
364        }
365    }
366}
367
368pub struct RemoteVideoTrack {
369    sid: Sid,
370    publisher_id: Sid,
371    frames_rx: async_broadcast::Receiver<Frame>,
372}
373
374impl RemoteVideoTrack {
375    pub fn sid(&self) -> &str {
376        &self.sid
377    }
378
379    pub fn publisher_id(&self) -> &str {
380        &self.publisher_id
381    }
382
383    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
384        self.frames_rx.clone()
385    }
386}
387
388#[derive(Clone)]
389pub enum RemoteVideoTrackUpdate {
390    Subscribed(Arc<RemoteVideoTrack>),
391    Unsubscribed { publisher_id: Sid, track_id: Sid },
392}
393
394#[derive(Clone)]
395pub struct MacOSDisplay {
396    frames: (
397        async_broadcast::Sender<Frame>,
398        async_broadcast::Receiver<Frame>,
399    ),
400}
401
402impl MacOSDisplay {
403    pub fn new() -> Self {
404        Self {
405            frames: async_broadcast::broadcast(128),
406        }
407    }
408
409    pub fn send_frame(&self, frame: Frame) {
410        self.frames.0.try_broadcast(frame).unwrap();
411    }
412}
413
414#[derive(Clone, Debug, PartialEq, Eq)]
415pub struct Frame {
416    pub label: String,
417    pub width: usize,
418    pub height: usize,
419}
420
421impl Frame {
422    pub fn width(&self) -> usize {
423        self.width
424    }
425
426    pub fn height(&self) -> usize {
427        self.height
428    }
429
430    pub fn image(&self) -> CVImageBuffer {
431        unimplemented!("you can't call this in test mode")
432    }
433}