test.rs

  1use anyhow::{anyhow, Result};
  2use async_trait::async_trait;
  3use collections::{BTreeMap, HashMap};
  4use futures::Stream;
  5use gpui::executor::Background;
  6use live_kit_server::token;
  7use media::core_video::CVImageBuffer;
  8use parking_lot::Mutex;
  9use postage::watch;
 10use std::{future::Future, mem, sync::Arc};
 11
 12static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 13
 14pub struct TestServer {
 15    pub url: String,
 16    pub api_key: String,
 17    pub secret_key: String,
 18    rooms: Mutex<HashMap<String, TestServerRoom>>,
 19    background: Arc<Background>,
 20}
 21
 22impl TestServer {
 23    pub fn create(
 24        url: String,
 25        api_key: String,
 26        secret_key: String,
 27        background: Arc<Background>,
 28    ) -> Result<Arc<TestServer>> {
 29        let mut servers = SERVERS.lock();
 30        if servers.contains_key(&url) {
 31            Err(anyhow!("a server with url {:?} already exists", url))
 32        } else {
 33            let server = Arc::new(TestServer {
 34                url: url.clone(),
 35                api_key,
 36                secret_key,
 37                rooms: Default::default(),
 38                background,
 39            });
 40            servers.insert(url, server.clone());
 41            Ok(server)
 42        }
 43    }
 44
 45    fn get(url: &str) -> Result<Arc<TestServer>> {
 46        Ok(SERVERS
 47            .lock()
 48            .get(url)
 49            .ok_or_else(|| anyhow!("no server found for url"))?
 50            .clone())
 51    }
 52
 53    pub fn teardown(&self) -> Result<()> {
 54        SERVERS
 55            .lock()
 56            .remove(&self.url)
 57            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 58        Ok(())
 59    }
 60
 61    pub fn create_api_client(&self) -> TestApiClient {
 62        TestApiClient {
 63            url: self.url.clone(),
 64        }
 65    }
 66
 67    pub async fn create_room(&self, room: String) -> Result<()> {
 68        self.background.simulate_random_delay().await;
 69        let mut server_rooms = self.rooms.lock();
 70        if server_rooms.contains_key(&room) {
 71            Err(anyhow!("room {:?} already exists", room))
 72        } else {
 73            server_rooms.insert(room, Default::default());
 74            Ok(())
 75        }
 76    }
 77
 78    async fn delete_room(&self, room: String) -> Result<()> {
 79        // TODO: clear state associated with all `Room`s.
 80        self.background.simulate_random_delay().await;
 81        let mut server_rooms = self.rooms.lock();
 82        server_rooms
 83            .remove(&room)
 84            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 85        Ok(())
 86    }
 87
 88    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 89        self.background.simulate_random_delay().await;
 90        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 91        let identity = claims.sub.unwrap().to_string();
 92        let room_name = claims.video.room.unwrap();
 93        let mut server_rooms = self.rooms.lock();
 94        let room = server_rooms
 95            .get_mut(&*room_name)
 96            .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
 97        if room.client_rooms.contains_key(&identity) {
 98            Err(anyhow!(
 99                "{:?} attempted to join room {:?} twice",
100                identity,
101                room_name
102            ))
103        } else {
104            for track in &room.video_tracks {
105                client_room
106                    .0
107                    .lock()
108                    .video_track_updates
109                    .0
110                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
111                    .unwrap();
112            }
113            room.client_rooms.insert(identity, client_room);
114            Ok(())
115        }
116    }
117
118    async fn leave_room(&self, token: String) -> Result<()> {
119        self.background.simulate_random_delay().await;
120        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
121        let identity = claims.sub.unwrap().to_string();
122        let room_name = claims.video.room.unwrap();
123        let mut server_rooms = self.rooms.lock();
124        let room = server_rooms
125            .get_mut(&*room_name)
126            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
127        room.client_rooms.remove(&identity).ok_or_else(|| {
128            anyhow!(
129                "{:?} attempted to leave room {:?} before joining it",
130                identity,
131                room_name
132            )
133        })?;
134        Ok(())
135    }
136
137    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
138        // TODO: clear state associated with the `Room`.
139
140        self.background.simulate_random_delay().await;
141        let mut server_rooms = self.rooms.lock();
142        let room = server_rooms
143            .get_mut(&room_name)
144            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
145        room.client_rooms.remove(&identity).ok_or_else(|| {
146            anyhow!(
147                "participant {:?} did not join room {:?}",
148                identity,
149                room_name
150            )
151        })?;
152        Ok(())
153    }
154
155    pub async fn disconnect_client(&self, client_identity: String) {
156        self.background.simulate_random_delay().await;
157        let mut server_rooms = self.rooms.lock();
158        for room in server_rooms.values_mut() {
159            if let Some(room) = room.client_rooms.remove(&client_identity) {
160                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
161            }
162        }
163    }
164
165    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
166        self.background.simulate_random_delay().await;
167        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
168        let identity = claims.sub.unwrap().to_string();
169        let room_name = claims.video.room.unwrap();
170
171        let mut server_rooms = self.rooms.lock();
172        let room = server_rooms
173            .get_mut(&*room_name)
174            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
175
176        let track = Arc::new(RemoteVideoTrack {
177            sid: nanoid::nanoid!(17),
178            publisher_id: identity.clone(),
179            frames_rx: local_track.frames_rx.clone(),
180        });
181
182        room.video_tracks.push(track.clone());
183
184        for (id, client_room) in &room.client_rooms {
185            if *id != identity {
186                let _ = client_room
187                    .0
188                    .lock()
189                    .video_track_updates
190                    .0
191                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
192                    .unwrap();
193            }
194        }
195
196        Ok(())
197    }
198
199    async fn publish_audio_track(
200        &self,
201        token: String,
202        _local_track: &LocalAudioTrack,
203    ) -> Result<()> {
204        self.background.simulate_random_delay().await;
205        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
206        let identity = claims.sub.unwrap().to_string();
207        let room_name = claims.video.room.unwrap();
208
209        let mut server_rooms = self.rooms.lock();
210        let room = server_rooms
211            .get_mut(&*room_name)
212            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
213
214        let track = Arc::new(RemoteAudioTrack {
215            sid: nanoid::nanoid!(17),
216            publisher_id: identity.clone(),
217        });
218
219        let publication = Arc::new(RemoteTrackPublication);
220
221        room.audio_tracks.push(track.clone());
222
223        for (id, client_room) in &room.client_rooms {
224            if *id != identity {
225                let _ = client_room
226                    .0
227                    .lock()
228                    .audio_track_updates
229                    .0
230                    .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
231                        track.clone(),
232                        publication.clone(),
233                    ))
234                    .unwrap();
235            }
236        }
237
238        Ok(())
239    }
240
241    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
242        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
243        let room_name = claims.video.room.unwrap();
244
245        let mut server_rooms = self.rooms.lock();
246        let room = server_rooms
247            .get_mut(&*room_name)
248            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
249        Ok(room.video_tracks.clone())
250    }
251
252    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
253        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
254        let room_name = claims.video.room.unwrap();
255
256        let mut server_rooms = self.rooms.lock();
257        let room = server_rooms
258            .get_mut(&*room_name)
259            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
260        Ok(room.audio_tracks.clone())
261    }
262}
263
264#[derive(Default)]
265struct TestServerRoom {
266    client_rooms: HashMap<Sid, Arc<Room>>,
267    video_tracks: Vec<Arc<RemoteVideoTrack>>,
268    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
269}
270
271impl TestServerRoom {}
272
273pub struct TestApiClient {
274    url: String,
275}
276
277#[async_trait]
278impl live_kit_server::api::Client for TestApiClient {
279    fn url(&self) -> &str {
280        &self.url
281    }
282
283    async fn create_room(&self, name: String) -> Result<()> {
284        let server = TestServer::get(&self.url)?;
285        server.create_room(name).await?;
286        Ok(())
287    }
288
289    async fn delete_room(&self, name: String) -> Result<()> {
290        let server = TestServer::get(&self.url)?;
291        server.delete_room(name).await?;
292        Ok(())
293    }
294
295    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
296        let server = TestServer::get(&self.url)?;
297        server.remove_participant(room, identity).await?;
298        Ok(())
299    }
300
301    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
302        let server = TestServer::get(&self.url)?;
303        token::create(
304            &server.api_key,
305            &server.secret_key,
306            Some(identity),
307            token::VideoGrant::to_join(room),
308        )
309    }
310}
311
312pub type Sid = String;
313
314struct RoomState {
315    connection: (
316        watch::Sender<ConnectionState>,
317        watch::Receiver<ConnectionState>,
318    ),
319    display_sources: Vec<MacOSDisplay>,
320    audio_track_updates: (
321        async_broadcast::Sender<RemoteAudioTrackUpdate>,
322        async_broadcast::Receiver<RemoteAudioTrackUpdate>,
323    ),
324    video_track_updates: (
325        async_broadcast::Sender<RemoteVideoTrackUpdate>,
326        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
327    ),
328}
329
330#[derive(Clone, Eq, PartialEq)]
331pub enum ConnectionState {
332    Disconnected,
333    Connected { url: String, token: String },
334}
335
336pub struct Room(Mutex<RoomState>);
337
338impl Room {
339    pub fn new() -> Arc<Self> {
340        Arc::new(Self(Mutex::new(RoomState {
341            connection: watch::channel_with(ConnectionState::Disconnected),
342            display_sources: Default::default(),
343            video_track_updates: async_broadcast::broadcast(128),
344            audio_track_updates: async_broadcast::broadcast(128),
345        })))
346    }
347
348    pub fn status(&self) -> watch::Receiver<ConnectionState> {
349        self.0.lock().connection.1.clone()
350    }
351
352    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
353        let this = self.clone();
354        let url = url.to_string();
355        let token = token.to_string();
356        async move {
357            let server = TestServer::get(&url)?;
358            server.join_room(token.clone(), this.clone()).await?;
359            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
360            Ok(())
361        }
362    }
363
364    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
365        let this = self.clone();
366        async move {
367            let server = this.test_server();
368            server.background.simulate_random_delay().await;
369            Ok(this.0.lock().display_sources.clone())
370        }
371    }
372
373    pub fn publish_video_track(
374        self: &Arc<Self>,
375        track: &LocalVideoTrack,
376    ) -> impl Future<Output = Result<LocalTrackPublication>> {
377        let this = self.clone();
378        let track = track.clone();
379        async move {
380            this.test_server()
381                .publish_video_track(this.token(), track)
382                .await?;
383            Ok(LocalTrackPublication)
384        }
385    }
386    pub fn publish_audio_track(
387        self: &Arc<Self>,
388        track: &LocalAudioTrack,
389    ) -> impl Future<Output = Result<LocalTrackPublication>> {
390        let this = self.clone();
391        let track = track.clone();
392        async move {
393            this.test_server()
394                .publish_audio_track(this.token(), &track)
395                .await?;
396            Ok(LocalTrackPublication)
397        }
398    }
399
400    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
401
402    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
403        if !self.is_connected() {
404            return Vec::new();
405        }
406
407        self.test_server()
408            .audio_tracks(self.token())
409            .unwrap()
410            .into_iter()
411            .filter(|track| track.publisher_id() == publisher_id)
412            .collect()
413    }
414
415    pub fn remote_audio_track_publications(
416        &self,
417        publisher_id: &str,
418    ) -> Vec<Arc<RemoteTrackPublication>> {
419        if !self.is_connected() {
420            return Vec::new();
421        }
422
423        self.test_server()
424            .audio_tracks(self.token())
425            .unwrap()
426            .into_iter()
427            .filter(|track| track.publisher_id() == publisher_id)
428            .map(|_track| Arc::new(RemoteTrackPublication {}))
429            .collect()
430    }
431
432    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
433        if !self.is_connected() {
434            return Vec::new();
435        }
436
437        self.test_server()
438            .video_tracks(self.token())
439            .unwrap()
440            .into_iter()
441            .filter(|track| track.publisher_id() == publisher_id)
442            .collect()
443    }
444
445    pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
446        self.0.lock().audio_track_updates.1.clone()
447    }
448
449    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
450        self.0.lock().video_track_updates.1.clone()
451    }
452
453    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
454        self.0.lock().display_sources = sources;
455    }
456
457    fn test_server(&self) -> Arc<TestServer> {
458        match self.0.lock().connection.1.borrow().clone() {
459            ConnectionState::Disconnected => panic!("must be connected to call this method"),
460            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
461        }
462    }
463
464    fn token(&self) -> String {
465        match self.0.lock().connection.1.borrow().clone() {
466            ConnectionState::Disconnected => panic!("must be connected to call this method"),
467            ConnectionState::Connected { token, .. } => token,
468        }
469    }
470
471    fn is_connected(&self) -> bool {
472        match *self.0.lock().connection.1.borrow() {
473            ConnectionState::Disconnected => false,
474            ConnectionState::Connected { .. } => true,
475        }
476    }
477}
478
479impl Drop for Room {
480    fn drop(&mut self) {
481        if let ConnectionState::Connected { token, .. } = mem::replace(
482            &mut *self.0.lock().connection.0.borrow_mut(),
483            ConnectionState::Disconnected,
484        ) {
485            if let Ok(server) = TestServer::get(&token) {
486                let background = server.background.clone();
487                background
488                    .spawn(async move { server.leave_room(token).await.unwrap() })
489                    .detach();
490            }
491        }
492    }
493}
494
495pub struct LocalTrackPublication;
496
497impl LocalTrackPublication {
498    pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
499        async { Ok(()) }
500    }
501}
502
503pub struct RemoteTrackPublication;
504
505impl RemoteTrackPublication {
506    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
507        async { Ok(()) }
508    }
509
510    pub fn is_muted(&self) -> bool {
511        false
512    }
513
514    pub fn sid(&self) -> String {
515        "".to_string()
516    }
517}
518
519#[derive(Clone)]
520pub struct LocalVideoTrack {
521    frames_rx: async_broadcast::Receiver<Frame>,
522}
523
524impl LocalVideoTrack {
525    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
526        Self {
527            frames_rx: display.frames.1.clone(),
528        }
529    }
530}
531
532#[derive(Clone)]
533pub struct LocalAudioTrack;
534
535impl LocalAudioTrack {
536    pub fn create() -> Self {
537        Self
538    }
539}
540
541pub struct RemoteVideoTrack {
542    sid: Sid,
543    publisher_id: Sid,
544    frames_rx: async_broadcast::Receiver<Frame>,
545}
546
547impl RemoteVideoTrack {
548    pub fn sid(&self) -> &str {
549        &self.sid
550    }
551
552    pub fn publisher_id(&self) -> &str {
553        &self.publisher_id
554    }
555
556    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
557        self.frames_rx.clone()
558    }
559}
560
561#[derive(Debug)]
562pub struct RemoteAudioTrack {
563    sid: Sid,
564    publisher_id: Sid,
565}
566
567impl RemoteAudioTrack {
568    pub fn sid(&self) -> &str {
569        &self.sid
570    }
571
572    pub fn publisher_id(&self) -> &str {
573        &self.publisher_id
574    }
575
576    pub fn enable(&self) -> impl Future<Output = Result<()>> {
577        async { Ok(()) }
578    }
579
580    pub fn disable(&self) -> impl Future<Output = Result<()>> {
581        async { Ok(()) }
582    }
583}
584
585#[derive(Clone)]
586pub enum RemoteVideoTrackUpdate {
587    Subscribed(Arc<RemoteVideoTrack>),
588    Unsubscribed { publisher_id: Sid, track_id: Sid },
589}
590
591#[derive(Clone)]
592pub enum RemoteAudioTrackUpdate {
593    ActiveSpeakersChanged { speakers: Vec<Sid> },
594    MuteChanged { track_id: Sid, muted: bool },
595    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
596    Unsubscribed { publisher_id: Sid, track_id: Sid },
597}
598
599#[derive(Clone)]
600pub struct MacOSDisplay {
601    frames: (
602        async_broadcast::Sender<Frame>,
603        async_broadcast::Receiver<Frame>,
604    ),
605}
606
607impl MacOSDisplay {
608    pub fn new() -> Self {
609        Self {
610            frames: async_broadcast::broadcast(128),
611        }
612    }
613
614    pub fn send_frame(&self, frame: Frame) {
615        self.frames.0.try_broadcast(frame).unwrap();
616    }
617}
618
619#[derive(Clone, Debug, PartialEq, Eq)]
620pub struct Frame {
621    pub label: String,
622    pub width: usize,
623    pub height: usize,
624}
625
626impl Frame {
627    pub fn width(&self) -> usize {
628        self.width
629    }
630
631    pub fn height(&self) -> usize {
632        self.height
633    }
634
635    pub fn image(&self) -> CVImageBuffer {
636        unimplemented!("you can't call this in test mode")
637    }
638}