test.rs

  1use crate::{ConnectionState, RoomUpdate, Sid};
  2use anyhow::{anyhow, Context, Result};
  3use async_trait::async_trait;
  4use collections::{BTreeMap, HashMap};
  5use futures::Stream;
  6use gpui::BackgroundExecutor;
  7use live_kit_server::{proto, token};
  8use media::core_video::CVImageBuffer;
  9use parking_lot::Mutex;
 10use postage::watch;
 11use std::{
 12    future::Future,
 13    mem,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        Arc,
 17    },
 18};
 19
 20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 21
 22pub struct TestServer {
 23    pub url: String,
 24    pub api_key: String,
 25    pub secret_key: String,
 26    rooms: Mutex<HashMap<String, TestServerRoom>>,
 27    executor: BackgroundExecutor,
 28}
 29
 30impl TestServer {
 31    pub fn create(
 32        url: String,
 33        api_key: String,
 34        secret_key: String,
 35        executor: BackgroundExecutor,
 36    ) -> Result<Arc<TestServer>> {
 37        let mut servers = SERVERS.lock();
 38        if servers.contains_key(&url) {
 39            Err(anyhow!("a server with url {:?} already exists", url))
 40        } else {
 41            let server = Arc::new(TestServer {
 42                url: url.clone(),
 43                api_key,
 44                secret_key,
 45                rooms: Default::default(),
 46                executor,
 47            });
 48            servers.insert(url, server.clone());
 49            Ok(server)
 50        }
 51    }
 52
 53    fn get(url: &str) -> Result<Arc<TestServer>> {
 54        Ok(SERVERS
 55            .lock()
 56            .get(url)
 57            .ok_or_else(|| anyhow!("no server found for url"))?
 58            .clone())
 59    }
 60
 61    pub fn teardown(&self) -> Result<()> {
 62        SERVERS
 63            .lock()
 64            .remove(&self.url)
 65            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 66        Ok(())
 67    }
 68
 69    pub fn create_api_client(&self) -> TestApiClient {
 70        TestApiClient {
 71            url: self.url.clone(),
 72        }
 73    }
 74
 75    pub async fn create_room(&self, room: String) -> Result<()> {
 76        self.executor.simulate_random_delay().await;
 77        let mut server_rooms = self.rooms.lock();
 78        if server_rooms.contains_key(&room) {
 79            Err(anyhow!("room {:?} already exists", room))
 80        } else {
 81            server_rooms.insert(room, Default::default());
 82            Ok(())
 83        }
 84    }
 85
 86    async fn delete_room(&self, room: String) -> Result<()> {
 87        // TODO: clear state associated with all `Room`s.
 88        self.executor.simulate_random_delay().await;
 89        let mut server_rooms = self.rooms.lock();
 90        server_rooms
 91            .remove(&room)
 92            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 93        Ok(())
 94    }
 95
 96    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 97        self.executor.simulate_random_delay().await;
 98        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 99        let identity = claims.sub.unwrap().to_string();
100        let room_name = claims.video.room.unwrap();
101        let mut server_rooms = self.rooms.lock();
102        let room = (*server_rooms).entry(room_name.to_string()).or_default();
103
104        if room.client_rooms.contains_key(&identity) {
105            Err(anyhow!(
106                "{:?} attempted to join room {:?} twice",
107                identity,
108                room_name
109            ))
110        } else {
111            for track in &room.video_tracks {
112                client_room
113                    .0
114                    .lock()
115                    .updates_tx
116                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
117                    .unwrap();
118            }
119            room.client_rooms.insert(identity, client_room);
120            Ok(())
121        }
122    }
123
124    async fn leave_room(&self, token: String) -> Result<()> {
125        self.executor.simulate_random_delay().await;
126        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
127        let identity = claims.sub.unwrap().to_string();
128        let room_name = claims.video.room.unwrap();
129        let mut server_rooms = self.rooms.lock();
130        let room = server_rooms
131            .get_mut(&*room_name)
132            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
133        room.client_rooms.remove(&identity).ok_or_else(|| {
134            anyhow!(
135                "{:?} attempted to leave room {:?} before joining it",
136                identity,
137                room_name
138            )
139        })?;
140        Ok(())
141    }
142
143    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
144        // TODO: clear state associated with the `Room`.
145
146        self.executor.simulate_random_delay().await;
147        let mut server_rooms = self.rooms.lock();
148        let room = server_rooms
149            .get_mut(&room_name)
150            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
151        room.client_rooms.remove(&identity).ok_or_else(|| {
152            anyhow!(
153                "participant {:?} did not join room {:?}",
154                identity,
155                room_name
156            )
157        })?;
158        Ok(())
159    }
160
161    async fn update_participant(
162        &self,
163        room_name: String,
164        identity: String,
165        permission: proto::ParticipantPermission,
166    ) -> Result<()> {
167        self.executor.simulate_random_delay().await;
168        let mut server_rooms = self.rooms.lock();
169        let room = server_rooms
170            .get_mut(&room_name)
171            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
172        room.participant_permissions.insert(identity, permission);
173        Ok(())
174    }
175
176    pub async fn disconnect_client(&self, client_identity: String) {
177        self.executor.simulate_random_delay().await;
178        let mut server_rooms = self.rooms.lock();
179        for room in server_rooms.values_mut() {
180            if let Some(room) = room.client_rooms.remove(&client_identity) {
181                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
182            }
183        }
184    }
185
186    async fn publish_video_track(
187        &self,
188        token: String,
189        local_track: LocalVideoTrack,
190    ) -> Result<Sid> {
191        self.executor.simulate_random_delay().await;
192        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
193        let identity = claims.sub.unwrap().to_string();
194        let room_name = claims.video.room.unwrap();
195
196        let mut server_rooms = self.rooms.lock();
197        let room = server_rooms
198            .get_mut(&*room_name)
199            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
200
201        let can_publish = room
202            .participant_permissions
203            .get(&identity)
204            .map(|permission| permission.can_publish)
205            .or(claims.video.can_publish)
206            .unwrap_or(true);
207
208        if !can_publish {
209            return Err(anyhow!("user is not allowed to publish"));
210        }
211
212        let sid = nanoid::nanoid!(17);
213        let track = Arc::new(RemoteVideoTrack {
214            sid: sid.clone(),
215            publisher_id: identity.clone(),
216            frames_rx: local_track.frames_rx.clone(),
217        });
218
219        room.video_tracks.push(track.clone());
220
221        for (id, client_room) in &room.client_rooms {
222            if *id != identity {
223                let _ = client_room
224                    .0
225                    .lock()
226                    .updates_tx
227                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
228                    .unwrap();
229            }
230        }
231
232        Ok(sid)
233    }
234
235    async fn publish_audio_track(
236        &self,
237        token: String,
238        _local_track: &LocalAudioTrack,
239    ) -> Result<Sid> {
240        self.executor.simulate_random_delay().await;
241        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
242        let identity = claims.sub.unwrap().to_string();
243        let room_name = claims.video.room.unwrap();
244
245        let mut server_rooms = self.rooms.lock();
246        let room = server_rooms
247            .get_mut(&*room_name)
248            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
249
250        let can_publish = room
251            .participant_permissions
252            .get(&identity)
253            .map(|permission| permission.can_publish)
254            .or(claims.video.can_publish)
255            .unwrap_or(true);
256
257        if !can_publish {
258            return Err(anyhow!("user is not allowed to publish"));
259        }
260
261        let sid = nanoid::nanoid!(17);
262        let track = Arc::new(RemoteAudioTrack {
263            sid: sid.clone(),
264            publisher_id: identity.clone(),
265        });
266
267        let publication = Arc::new(RemoteTrackPublication);
268
269        room.audio_tracks.push(track.clone());
270
271        for (id, client_room) in &room.client_rooms {
272            if *id != identity {
273                let _ = client_room
274                    .0
275                    .lock()
276                    .updates_tx
277                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
278                        track.clone(),
279                        publication.clone(),
280                    ))
281                    .unwrap();
282            }
283        }
284
285        Ok(sid)
286    }
287
288    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
289        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
290        let room_name = claims.video.room.unwrap();
291
292        let mut server_rooms = self.rooms.lock();
293        let room = server_rooms
294            .get_mut(&*room_name)
295            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
296        Ok(room.video_tracks.clone())
297    }
298
299    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
300        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
301        let room_name = claims.video.room.unwrap();
302
303        let mut server_rooms = self.rooms.lock();
304        let room = server_rooms
305            .get_mut(&*room_name)
306            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
307        Ok(room.audio_tracks.clone())
308    }
309}
310
311#[derive(Default)]
312struct TestServerRoom {
313    client_rooms: HashMap<Sid, Arc<Room>>,
314    video_tracks: Vec<Arc<RemoteVideoTrack>>,
315    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
316    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
317}
318
319impl TestServerRoom {}
320
321pub struct TestApiClient {
322    url: String,
323}
324
325#[async_trait]
326impl live_kit_server::api::Client for TestApiClient {
327    fn url(&self) -> &str {
328        &self.url
329    }
330
331    async fn create_room(&self, name: String) -> Result<()> {
332        let server = TestServer::get(&self.url)?;
333        server.create_room(name).await?;
334        Ok(())
335    }
336
337    async fn delete_room(&self, name: String) -> Result<()> {
338        let server = TestServer::get(&self.url)?;
339        server.delete_room(name).await?;
340        Ok(())
341    }
342
343    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
344        let server = TestServer::get(&self.url)?;
345        server.remove_participant(room, identity).await?;
346        Ok(())
347    }
348
349    async fn update_participant(
350        &self,
351        room: String,
352        identity: String,
353        permission: live_kit_server::proto::ParticipantPermission,
354    ) -> Result<()> {
355        let server = TestServer::get(&self.url)?;
356        server
357            .update_participant(room, identity, permission)
358            .await?;
359        Ok(())
360    }
361
362    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
363        let server = TestServer::get(&self.url)?;
364        token::create(
365            &server.api_key,
366            &server.secret_key,
367            Some(identity),
368            token::VideoGrant::to_join(room),
369        )
370    }
371
372    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
373        let server = TestServer::get(&self.url)?;
374        token::create(
375            &server.api_key,
376            &server.secret_key,
377            Some(identity),
378            token::VideoGrant::for_guest(room),
379        )
380    }
381}
382
383struct RoomState {
384    connection: (
385        watch::Sender<ConnectionState>,
386        watch::Receiver<ConnectionState>,
387    ),
388    display_sources: Vec<MacOSDisplay>,
389    updates_tx: async_broadcast::Sender<RoomUpdate>,
390    updates_rx: async_broadcast::Receiver<RoomUpdate>,
391}
392
393pub struct Room(Mutex<RoomState>);
394
395impl Room {
396    pub fn new() -> Arc<Self> {
397        let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
398        Arc::new(Self(Mutex::new(RoomState {
399            connection: watch::channel_with(ConnectionState::Disconnected),
400            display_sources: Default::default(),
401            updates_tx,
402            updates_rx,
403        })))
404    }
405
406    pub fn status(&self) -> watch::Receiver<ConnectionState> {
407        self.0.lock().connection.1.clone()
408    }
409
410    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
411        let this = self.clone();
412        let url = url.to_string();
413        let token = token.to_string();
414        async move {
415            let server = TestServer::get(&url)?;
416            server
417                .join_room(token.clone(), this.clone())
418                .await
419                .context("room join")?;
420            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
421            Ok(())
422        }
423    }
424
425    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
426        let this = self.clone();
427        async move {
428            let server = this.test_server();
429            server.executor.simulate_random_delay().await;
430            Ok(this.0.lock().display_sources.clone())
431        }
432    }
433
434    pub fn publish_video_track(
435        self: &Arc<Self>,
436        track: LocalVideoTrack,
437    ) -> impl Future<Output = Result<LocalTrackPublication>> {
438        let this = self.clone();
439        let track = track.clone();
440        async move {
441            let sid = this
442                .test_server()
443                .publish_video_track(this.token(), track)
444                .await?;
445            Ok(LocalTrackPublication {
446                muted: Default::default(),
447                sid,
448            })
449        }
450    }
451    pub fn publish_audio_track(
452        self: &Arc<Self>,
453        track: LocalAudioTrack,
454    ) -> impl Future<Output = Result<LocalTrackPublication>> {
455        let this = self.clone();
456        let track = track.clone();
457        async move {
458            let sid = this
459                .test_server()
460                .publish_audio_track(this.token(), &track)
461                .await?;
462            Ok(LocalTrackPublication {
463                muted: Default::default(),
464                sid,
465            })
466        }
467    }
468
469    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
470
471    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
472        if !self.is_connected() {
473            return Vec::new();
474        }
475
476        self.test_server()
477            .audio_tracks(self.token())
478            .unwrap()
479            .into_iter()
480            .filter(|track| track.publisher_id() == publisher_id)
481            .collect()
482    }
483
484    pub fn remote_audio_track_publications(
485        &self,
486        publisher_id: &str,
487    ) -> Vec<Arc<RemoteTrackPublication>> {
488        if !self.is_connected() {
489            return Vec::new();
490        }
491
492        self.test_server()
493            .audio_tracks(self.token())
494            .unwrap()
495            .into_iter()
496            .filter(|track| track.publisher_id() == publisher_id)
497            .map(|_track| Arc::new(RemoteTrackPublication {}))
498            .collect()
499    }
500
501    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
502        if !self.is_connected() {
503            return Vec::new();
504        }
505
506        self.test_server()
507            .video_tracks(self.token())
508            .unwrap()
509            .into_iter()
510            .filter(|track| track.publisher_id() == publisher_id)
511            .collect()
512    }
513
514    pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
515        self.0.lock().updates_rx.clone()
516    }
517
518    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
519        self.0.lock().display_sources = sources;
520    }
521
522    fn test_server(&self) -> Arc<TestServer> {
523        match self.0.lock().connection.1.borrow().clone() {
524            ConnectionState::Disconnected => panic!("must be connected to call this method"),
525            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
526        }
527    }
528
529    fn token(&self) -> String {
530        match self.0.lock().connection.1.borrow().clone() {
531            ConnectionState::Disconnected => panic!("must be connected to call this method"),
532            ConnectionState::Connected { token, .. } => token,
533        }
534    }
535
536    fn is_connected(&self) -> bool {
537        match *self.0.lock().connection.1.borrow() {
538            ConnectionState::Disconnected => false,
539            ConnectionState::Connected { .. } => true,
540        }
541    }
542}
543
544impl Drop for Room {
545    fn drop(&mut self) {
546        if let ConnectionState::Connected { token, .. } = mem::replace(
547            &mut *self.0.lock().connection.0.borrow_mut(),
548            ConnectionState::Disconnected,
549        ) {
550            if let Ok(server) = TestServer::get(&token) {
551                let executor = server.executor.clone();
552                executor
553                    .spawn(async move { server.leave_room(token).await.unwrap() })
554                    .detach();
555            }
556        }
557    }
558}
559
560#[derive(Clone)]
561pub struct LocalTrackPublication {
562    sid: String,
563    muted: Arc<AtomicBool>,
564}
565
566impl LocalTrackPublication {
567    pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
568        let muted = self.muted.clone();
569        async move {
570            muted.store(mute, SeqCst);
571            Ok(())
572        }
573    }
574
575    pub fn is_muted(&self) -> bool {
576        self.muted.load(SeqCst)
577    }
578
579    pub fn sid(&self) -> String {
580        self.sid.clone()
581    }
582}
583
584pub struct RemoteTrackPublication;
585
586impl RemoteTrackPublication {
587    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
588        async { Ok(()) }
589    }
590
591    pub fn is_muted(&self) -> bool {
592        false
593    }
594
595    pub fn sid(&self) -> String {
596        "".to_string()
597    }
598}
599
600#[derive(Clone)]
601pub struct LocalVideoTrack {
602    frames_rx: async_broadcast::Receiver<Frame>,
603}
604
605impl LocalVideoTrack {
606    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
607        Self {
608            frames_rx: display.frames.1.clone(),
609        }
610    }
611}
612
613#[derive(Clone)]
614pub struct LocalAudioTrack;
615
616impl LocalAudioTrack {
617    pub fn create() -> Self {
618        Self
619    }
620}
621
622#[derive(Debug)]
623pub struct RemoteVideoTrack {
624    sid: Sid,
625    publisher_id: Sid,
626    frames_rx: async_broadcast::Receiver<Frame>,
627}
628
629impl RemoteVideoTrack {
630    pub fn sid(&self) -> &str {
631        &self.sid
632    }
633
634    pub fn publisher_id(&self) -> &str {
635        &self.publisher_id
636    }
637
638    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
639        self.frames_rx.clone()
640    }
641}
642
643#[derive(Debug)]
644pub struct RemoteAudioTrack {
645    sid: Sid,
646    publisher_id: Sid,
647}
648
649impl RemoteAudioTrack {
650    pub fn sid(&self) -> &str {
651        &self.sid
652    }
653
654    pub fn publisher_id(&self) -> &str {
655        &self.publisher_id
656    }
657
658    pub fn enable(&self) -> impl Future<Output = Result<()>> {
659        async { Ok(()) }
660    }
661
662    pub fn disable(&self) -> impl Future<Output = Result<()>> {
663        async { Ok(()) }
664    }
665}
666
667#[derive(Clone)]
668pub struct MacOSDisplay {
669    frames: (
670        async_broadcast::Sender<Frame>,
671        async_broadcast::Receiver<Frame>,
672    ),
673}
674
675impl MacOSDisplay {
676    pub fn new() -> Self {
677        Self {
678            frames: async_broadcast::broadcast(128),
679        }
680    }
681
682    pub fn send_frame(&self, frame: Frame) {
683        self.frames.0.try_broadcast(frame).unwrap();
684    }
685}
686
687#[derive(Clone, Debug, PartialEq, Eq)]
688pub struct Frame {
689    pub label: String,
690    pub width: usize,
691    pub height: usize,
692}
693
694impl Frame {
695    pub fn width(&self) -> usize {
696        self.width
697    }
698
699    pub fn height(&self) -> usize {
700        self.height
701    }
702
703    pub fn image(&self) -> CVImageBuffer {
704        unimplemented!("you can't call this in test mode")
705    }
706}