test.rs

  1use anyhow::{anyhow, Result};
  2use async_trait::async_trait;
  3use collections::HashMap;
  4use futures::Stream;
  5use gpui::executor::Background;
  6use lazy_static::lazy_static;
  7use live_kit_server::token;
  8use media::core_video::CVImageBuffer;
  9use parking_lot::Mutex;
 10use postage::watch;
 11use std::{future::Future, mem, sync::Arc};
 12
 13lazy_static! {
 14    static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
 15}
 16
 17pub struct TestServer {
 18    pub url: String,
 19    pub api_key: String,
 20    pub secret_key: String,
 21    rooms: Mutex<HashMap<String, TestServerRoom>>,
 22    background: Arc<Background>,
 23}
 24
 25impl TestServer {
 26    pub fn create(
 27        url: String,
 28        api_key: String,
 29        secret_key: String,
 30        background: Arc<Background>,
 31    ) -> Result<Arc<TestServer>> {
 32        let mut servers = SERVERS.lock();
 33        if servers.contains_key(&url) {
 34            Err(anyhow!("a server with url {:?} already exists", url))
 35        } else {
 36            let server = Arc::new(TestServer {
 37                url: url.clone(),
 38                api_key,
 39                secret_key,
 40                rooms: Default::default(),
 41                background,
 42            });
 43            servers.insert(url, server.clone());
 44            Ok(server)
 45        }
 46    }
 47
 48    fn get(url: &str) -> Result<Arc<TestServer>> {
 49        Ok(SERVERS
 50            .lock()
 51            .get(url)
 52            .ok_or_else(|| anyhow!("no server found for url"))?
 53            .clone())
 54    }
 55
 56    pub fn teardown(&self) -> Result<()> {
 57        SERVERS
 58            .lock()
 59            .remove(&self.url)
 60            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 61        Ok(())
 62    }
 63
 64    pub fn create_api_client(&self) -> TestApiClient {
 65        TestApiClient {
 66            url: self.url.clone(),
 67        }
 68    }
 69
 70    pub async fn create_room(&self, room: String) -> Result<()> {
 71        self.background.simulate_random_delay().await;
 72        let mut server_rooms = self.rooms.lock();
 73        if server_rooms.contains_key(&room) {
 74            Err(anyhow!("room {:?} already exists", room))
 75        } else {
 76            server_rooms.insert(room, Default::default());
 77            Ok(())
 78        }
 79    }
 80
 81    async fn delete_room(&self, room: String) -> Result<()> {
 82        // TODO: clear state associated with all `Room`s.
 83        self.background.simulate_random_delay().await;
 84        let mut server_rooms = self.rooms.lock();
 85        server_rooms
 86            .remove(&room)
 87            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 88        Ok(())
 89    }
 90
 91    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 92        self.background.simulate_random_delay().await;
 93        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 94        let identity = claims.sub.unwrap().to_string();
 95        let room_name = claims.video.room.unwrap();
 96        let mut server_rooms = self.rooms.lock();
 97        let room = server_rooms
 98            .get_mut(&*room_name)
 99            .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
100        if room.client_rooms.contains_key(&identity) {
101            Err(anyhow!(
102                "{:?} attempted to join room {:?} twice",
103                identity,
104                room_name
105            ))
106        } else {
107            for track in &room.video_tracks {
108                client_room
109                    .0
110                    .lock()
111                    .video_track_updates
112                    .0
113                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
114                    .unwrap();
115            }
116            room.client_rooms.insert(identity, client_room);
117            Ok(())
118        }
119    }
120
121    async fn leave_room(&self, token: String) -> Result<()> {
122        self.background.simulate_random_delay().await;
123        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
124        let identity = claims.sub.unwrap().to_string();
125        let room_name = claims.video.room.unwrap();
126        let mut server_rooms = self.rooms.lock();
127        let room = server_rooms
128            .get_mut(&*room_name)
129            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
130        room.client_rooms.remove(&identity).ok_or_else(|| {
131            anyhow!(
132                "{:?} attempted to leave room {:?} before joining it",
133                identity,
134                room_name
135            )
136        })?;
137        Ok(())
138    }
139
140    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
141        // TODO: clear state associated with the `Room`.
142
143        self.background.simulate_random_delay().await;
144        let mut server_rooms = self.rooms.lock();
145        let room = server_rooms
146            .get_mut(&room_name)
147            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
148        room.client_rooms.remove(&identity).ok_or_else(|| {
149            anyhow!(
150                "participant {:?} did not join room {:?}",
151                identity,
152                room_name
153            )
154        })?;
155        Ok(())
156    }
157
158    pub async fn disconnect_client(&self, client_identity: String) {
159        self.background.simulate_random_delay().await;
160        let mut server_rooms = self.rooms.lock();
161        for room in server_rooms.values_mut() {
162            if let Some(room) = room.client_rooms.remove(&client_identity) {
163                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
164            }
165        }
166    }
167
168    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
169        self.background.simulate_random_delay().await;
170        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
171        let identity = claims.sub.unwrap().to_string();
172        let room_name = claims.video.room.unwrap();
173
174        let mut server_rooms = self.rooms.lock();
175        let room = server_rooms
176            .get_mut(&*room_name)
177            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
178
179        let track = Arc::new(RemoteVideoTrack {
180            sid: nanoid::nanoid!(17),
181            publisher_id: identity.clone(),
182            frames_rx: local_track.frames_rx.clone(),
183        });
184
185        room.video_tracks.push(track.clone());
186
187        for (id, client_room) in &room.client_rooms {
188            if *id != identity {
189                let _ = client_room
190                    .0
191                    .lock()
192                    .video_track_updates
193                    .0
194                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
195                    .unwrap();
196            }
197        }
198
199        Ok(())
200    }
201
202    async fn publish_audio_track(
203        &self,
204        token: String,
205        _local_track: &LocalAudioTrack,
206    ) -> Result<()> {
207        self.background.simulate_random_delay().await;
208        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
209        let identity = claims.sub.unwrap().to_string();
210        let room_name = claims.video.room.unwrap();
211
212        let mut server_rooms = self.rooms.lock();
213        let room = server_rooms
214            .get_mut(&*room_name)
215            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
216
217        let track = Arc::new(RemoteAudioTrack {
218            sid: nanoid::nanoid!(17),
219            publisher_id: identity.clone(),
220        });
221
222        room.audio_tracks.push(track.clone());
223
224        for (id, client_room) in &room.client_rooms {
225            if *id != identity {
226                let _ = client_room
227                    .0
228                    .lock()
229                    .audio_track_updates
230                    .0
231                    .try_broadcast(RemoteAudioTrackUpdate::Subscribed(track.clone()))
232                    .unwrap();
233            }
234        }
235
236        Ok(())
237    }
238
239    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
240        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
241        let room_name = claims.video.room.unwrap();
242
243        let mut server_rooms = self.rooms.lock();
244        let room = server_rooms
245            .get_mut(&*room_name)
246            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
247        Ok(room.video_tracks.clone())
248    }
249
250    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
251        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
252        let room_name = claims.video.room.unwrap();
253
254        let mut server_rooms = self.rooms.lock();
255        let room = server_rooms
256            .get_mut(&*room_name)
257            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
258        Ok(room.audio_tracks.clone())
259    }
260}
261
262#[derive(Default)]
263struct TestServerRoom {
264    client_rooms: HashMap<Sid, Arc<Room>>,
265    video_tracks: Vec<Arc<RemoteVideoTrack>>,
266    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
267}
268
269impl TestServerRoom {}
270
271pub struct TestApiClient {
272    url: String,
273}
274
275#[async_trait]
276impl live_kit_server::api::Client for TestApiClient {
277    fn url(&self) -> &str {
278        &self.url
279    }
280
281    async fn create_room(&self, name: String) -> Result<()> {
282        let server = TestServer::get(&self.url)?;
283        server.create_room(name).await?;
284        Ok(())
285    }
286
287    async fn delete_room(&self, name: String) -> Result<()> {
288        let server = TestServer::get(&self.url)?;
289        server.delete_room(name).await?;
290        Ok(())
291    }
292
293    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
294        let server = TestServer::get(&self.url)?;
295        server.remove_participant(room, identity).await?;
296        Ok(())
297    }
298
299    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
300        let server = TestServer::get(&self.url)?;
301        token::create(
302            &server.api_key,
303            &server.secret_key,
304            Some(identity),
305            token::VideoGrant::to_join(room),
306        )
307    }
308}
309
310pub type Sid = String;
311
312struct RoomState {
313    connection: (
314        watch::Sender<ConnectionState>,
315        watch::Receiver<ConnectionState>,
316    ),
317    display_sources: Vec<MacOSDisplay>,
318    audio_track_updates: (
319        async_broadcast::Sender<RemoteAudioTrackUpdate>,
320        async_broadcast::Receiver<RemoteAudioTrackUpdate>,
321    ),
322    video_track_updates: (
323        async_broadcast::Sender<RemoteVideoTrackUpdate>,
324        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
325    ),
326}
327
328#[derive(Clone, Eq, PartialEq)]
329pub enum ConnectionState {
330    Disconnected,
331    Connected { url: String, token: String },
332}
333
334pub struct Room(Mutex<RoomState>);
335
336impl Room {
337    pub fn new() -> Arc<Self> {
338        Arc::new(Self(Mutex::new(RoomState {
339            connection: watch::channel_with(ConnectionState::Disconnected),
340            display_sources: Default::default(),
341            video_track_updates: async_broadcast::broadcast(128),
342            audio_track_updates: async_broadcast::broadcast(128),
343        })))
344    }
345
346    pub fn status(&self) -> watch::Receiver<ConnectionState> {
347        self.0.lock().connection.1.clone()
348    }
349
350    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
351        let this = self.clone();
352        let url = url.to_string();
353        let token = token.to_string();
354        async move {
355            let server = TestServer::get(&url)?;
356            server.join_room(token.clone(), this.clone()).await?;
357            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
358            Ok(())
359        }
360    }
361
362    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
363        let this = self.clone();
364        async move {
365            let server = this.test_server();
366            server.background.simulate_random_delay().await;
367            Ok(this.0.lock().display_sources.clone())
368        }
369    }
370
371    pub fn publish_video_track(
372        self: &Arc<Self>,
373        track: &LocalVideoTrack,
374    ) -> impl Future<Output = Result<LocalTrackPublication>> {
375        let this = self.clone();
376        let track = track.clone();
377        async move {
378            this.test_server()
379                .publish_video_track(this.token(), track)
380                .await?;
381            Ok(LocalTrackPublication)
382        }
383    }
384    pub fn publish_audio_track(
385        self: &Arc<Self>,
386        track: &LocalAudioTrack,
387    ) -> impl Future<Output = Result<LocalTrackPublication>> {
388        let this = self.clone();
389        let track = track.clone();
390        async move {
391            this.test_server()
392                .publish_audio_track(this.token(), &track)
393                .await?;
394            Ok(LocalTrackPublication)
395        }
396    }
397
398    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
399
400    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
401        if !self.is_connected() {
402            return Vec::new();
403        }
404
405        self.test_server()
406            .audio_tracks(self.token())
407            .unwrap()
408            .into_iter()
409            .filter(|track| track.publisher_id() == publisher_id)
410            .collect()
411    }
412
413    pub fn remote_audio_track_publications(
414        &self,
415        publisher_id: &str,
416    ) -> Vec<Arc<RemoteTrackPublication>> {
417        if !self.is_connected() {
418            return Vec::new();
419        }
420
421        self.test_server()
422            .audio_tracks(self.token())
423            .unwrap()
424            .into_iter()
425            .filter(|track| track.publisher_id() == publisher_id)
426            .map(|_track| Arc::new(RemoteTrackPublication {}))
427            .collect()
428    }
429
430    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
431        if !self.is_connected() {
432            return Vec::new();
433        }
434
435        self.test_server()
436            .video_tracks(self.token())
437            .unwrap()
438            .into_iter()
439            .filter(|track| track.publisher_id() == publisher_id)
440            .collect()
441    }
442
443    pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
444        self.0.lock().audio_track_updates.1.clone()
445    }
446
447    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
448        self.0.lock().video_track_updates.1.clone()
449    }
450
451    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
452        self.0.lock().display_sources = sources;
453    }
454
455    fn test_server(&self) -> Arc<TestServer> {
456        match self.0.lock().connection.1.borrow().clone() {
457            ConnectionState::Disconnected => panic!("must be connected to call this method"),
458            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
459        }
460    }
461
462    fn token(&self) -> String {
463        match self.0.lock().connection.1.borrow().clone() {
464            ConnectionState::Disconnected => panic!("must be connected to call this method"),
465            ConnectionState::Connected { token, .. } => token,
466        }
467    }
468
469    fn is_connected(&self) -> bool {
470        match *self.0.lock().connection.1.borrow() {
471            ConnectionState::Disconnected => false,
472            ConnectionState::Connected { .. } => true,
473        }
474    }
475}
476
477impl Drop for Room {
478    fn drop(&mut self) {
479        if let ConnectionState::Connected { token, .. } = mem::replace(
480            &mut *self.0.lock().connection.0.borrow_mut(),
481            ConnectionState::Disconnected,
482        ) {
483            if let Ok(server) = TestServer::get(&token) {
484                let background = server.background.clone();
485                background
486                    .spawn(async move { server.leave_room(token).await.unwrap() })
487                    .detach();
488            }
489        }
490    }
491}
492
493pub struct LocalTrackPublication;
494
495impl LocalTrackPublication {
496    pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
497        async { Ok(()) }
498    }
499}
500
501pub struct RemoteTrackPublication;
502
503impl RemoteTrackPublication {
504    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
505        async { Ok(()) }
506    }
507}
508
509#[derive(Clone)]
510pub struct LocalVideoTrack {
511    frames_rx: async_broadcast::Receiver<Frame>,
512}
513
514impl LocalVideoTrack {
515    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
516        Self {
517            frames_rx: display.frames.1.clone(),
518        }
519    }
520}
521
522#[derive(Clone)]
523pub struct LocalAudioTrack;
524
525impl LocalAudioTrack {
526    pub fn create() -> Self {
527        Self
528    }
529}
530
531pub struct RemoteVideoTrack {
532    sid: Sid,
533    publisher_id: Sid,
534    frames_rx: async_broadcast::Receiver<Frame>,
535}
536
537impl RemoteVideoTrack {
538    pub fn sid(&self) -> &str {
539        &self.sid
540    }
541
542    pub fn publisher_id(&self) -> &str {
543        &self.publisher_id
544    }
545
546    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
547        self.frames_rx.clone()
548    }
549}
550
551#[derive(Debug)]
552pub struct RemoteAudioTrack {
553    sid: Sid,
554    publisher_id: Sid,
555}
556
557impl RemoteAudioTrack {
558    pub fn sid(&self) -> &str {
559        &self.sid
560    }
561
562    pub fn publisher_id(&self) -> &str {
563        &self.publisher_id
564    }
565
566    pub fn enable(&self) -> impl Future<Output = Result<()>> {
567        async { Ok(()) }
568    }
569
570    pub fn disable(&self) -> impl Future<Output = Result<()>> {
571        async { Ok(()) }
572    }
573}
574
575#[derive(Clone)]
576pub enum RemoteVideoTrackUpdate {
577    Subscribed(Arc<RemoteVideoTrack>),
578    Unsubscribed { publisher_id: Sid, track_id: Sid },
579}
580
581#[derive(Clone)]
582pub enum RemoteAudioTrackUpdate {
583    ActiveSpeakersChanged { speakers: Vec<Sid> },
584    MuteChanged { track_id: Sid, muted: bool },
585    Subscribed(Arc<RemoteAudioTrack>),
586    Unsubscribed { publisher_id: Sid, track_id: Sid },
587}
588
589#[derive(Clone)]
590pub struct MacOSDisplay {
591    frames: (
592        async_broadcast::Sender<Frame>,
593        async_broadcast::Receiver<Frame>,
594    ),
595}
596
597impl MacOSDisplay {
598    pub fn new() -> Self {
599        Self {
600            frames: async_broadcast::broadcast(128),
601        }
602    }
603
604    pub fn send_frame(&self, frame: Frame) {
605        self.frames.0.try_broadcast(frame).unwrap();
606    }
607}
608
609#[derive(Clone, Debug, PartialEq, Eq)]
610pub struct Frame {
611    pub label: String,
612    pub width: usize,
613    pub height: usize,
614}
615
616impl Frame {
617    pub fn width(&self) -> usize {
618        self.width
619    }
620
621    pub fn height(&self) -> usize {
622        self.height
623    }
624
625    pub fn image(&self) -> CVImageBuffer {
626        unimplemented!("you can't call this in test mode")
627    }
628}