test.rs

  1use crate::{ConnectionState, RoomUpdate, Sid};
  2use anyhow::{anyhow, Context, Result};
  3use async_trait::async_trait;
  4use collections::{BTreeMap, HashMap};
  5use futures::Stream;
  6use gpui::BackgroundExecutor;
  7use live_kit_server::{proto, token};
  8use media::core_video::CVImageBuffer;
  9use parking_lot::Mutex;
 10use postage::watch;
 11use std::{
 12    future::Future,
 13    mem,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        Arc,
 17    },
 18};
 19
 20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 21
 22pub struct TestServer {
 23    pub url: String,
 24    pub api_key: String,
 25    pub secret_key: String,
 26    rooms: Mutex<HashMap<String, TestServerRoom>>,
 27    executor: BackgroundExecutor,
 28}
 29
 30impl TestServer {
 31    pub fn create(
 32        url: String,
 33        api_key: String,
 34        secret_key: String,
 35        executor: BackgroundExecutor,
 36    ) -> Result<Arc<TestServer>> {
 37        let mut servers = SERVERS.lock();
 38        if servers.contains_key(&url) {
 39            Err(anyhow!("a server with url {:?} already exists", url))
 40        } else {
 41            let server = Arc::new(TestServer {
 42                url: url.clone(),
 43                api_key,
 44                secret_key,
 45                rooms: Default::default(),
 46                executor,
 47            });
 48            servers.insert(url, server.clone());
 49            Ok(server)
 50        }
 51    }
 52
 53    fn get(url: &str) -> Result<Arc<TestServer>> {
 54        Ok(SERVERS
 55            .lock()
 56            .get(url)
 57            .ok_or_else(|| anyhow!("no server found for url"))?
 58            .clone())
 59    }
 60
 61    pub fn teardown(&self) -> Result<()> {
 62        SERVERS
 63            .lock()
 64            .remove(&self.url)
 65            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 66        Ok(())
 67    }
 68
 69    pub fn create_api_client(&self) -> TestApiClient {
 70        TestApiClient {
 71            url: self.url.clone(),
 72        }
 73    }
 74
 75    pub async fn create_room(&self, room: String) -> Result<()> {
 76        self.executor.simulate_random_delay().await;
 77        let mut server_rooms = self.rooms.lock();
 78        if server_rooms.contains_key(&room) {
 79            Err(anyhow!("room {:?} already exists", room))
 80        } else {
 81            server_rooms.insert(room, Default::default());
 82            Ok(())
 83        }
 84    }
 85
 86    async fn delete_room(&self, room: String) -> Result<()> {
 87        // TODO: clear state associated with all `Room`s.
 88        self.executor.simulate_random_delay().await;
 89        let mut server_rooms = self.rooms.lock();
 90        server_rooms
 91            .remove(&room)
 92            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 93        Ok(())
 94    }
 95
 96    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 97        self.executor.simulate_random_delay().await;
 98        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 99        let identity = claims.sub.unwrap().to_string();
100        let room_name = claims.video.room.unwrap();
101        let mut server_rooms = self.rooms.lock();
102        let room = (*server_rooms).entry(room_name.to_string()).or_default();
103
104        if room.client_rooms.contains_key(&identity) {
105            Err(anyhow!(
106                "{:?} attempted to join room {:?} twice",
107                identity,
108                room_name
109            ))
110        } else {
111            for track in &room.video_tracks {
112                client_room
113                    .0
114                    .lock()
115                    .updates_tx
116                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
117                    .unwrap();
118            }
119            room.client_rooms.insert(identity, client_room);
120            Ok(())
121        }
122    }
123
124    async fn leave_room(&self, token: String) -> Result<()> {
125        self.executor.simulate_random_delay().await;
126        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
127        let identity = claims.sub.unwrap().to_string();
128        let room_name = claims.video.room.unwrap();
129        let mut server_rooms = self.rooms.lock();
130        let room = server_rooms
131            .get_mut(&*room_name)
132            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
133        room.client_rooms.remove(&identity).ok_or_else(|| {
134            anyhow!(
135                "{:?} attempted to leave room {:?} before joining it",
136                identity,
137                room_name
138            )
139        })?;
140        Ok(())
141    }
142
143    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
144        // TODO: clear state associated with the `Room`.
145
146        self.executor.simulate_random_delay().await;
147        let mut server_rooms = self.rooms.lock();
148        let room = server_rooms
149            .get_mut(&room_name)
150            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
151        room.client_rooms.remove(&identity).ok_or_else(|| {
152            anyhow!(
153                "participant {:?} did not join room {:?}",
154                identity,
155                room_name
156            )
157        })?;
158        Ok(())
159    }
160
161    async fn update_participant(
162        &self,
163        room_name: String,
164        identity: String,
165        permission: proto::ParticipantPermission,
166    ) -> Result<()> {
167        self.executor.simulate_random_delay().await;
168        let mut server_rooms = self.rooms.lock();
169        let room = server_rooms
170            .get_mut(&room_name)
171            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
172        room.participant_permissions.insert(identity, permission);
173        Ok(())
174    }
175
176    pub async fn disconnect_client(&self, client_identity: String) {
177        self.executor.simulate_random_delay().await;
178        let mut server_rooms = self.rooms.lock();
179        for room in server_rooms.values_mut() {
180            if let Some(room) = room.client_rooms.remove(&client_identity) {
181                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
182            }
183        }
184    }
185
186    async fn publish_video_track(
187        &self,
188        token: String,
189        local_track: LocalVideoTrack,
190    ) -> Result<Sid> {
191        self.executor.simulate_random_delay().await;
192        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
193        let identity = claims.sub.unwrap().to_string();
194        let room_name = claims.video.room.unwrap();
195
196        let mut server_rooms = self.rooms.lock();
197        let room = server_rooms
198            .get_mut(&*room_name)
199            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
200
201        let can_publish = room
202            .participant_permissions
203            .get(&identity)
204            .map(|permission| permission.can_publish)
205            .or(claims.video.can_publish)
206            .unwrap_or(true);
207
208        if !can_publish {
209            return Err(anyhow!("user is not allowed to publish"));
210        }
211
212        let sid = nanoid::nanoid!(17);
213        let track = Arc::new(RemoteVideoTrack {
214            sid: sid.clone(),
215            publisher_id: identity.clone(),
216            frames_rx: local_track.frames_rx.clone(),
217        });
218
219        room.video_tracks.push(track.clone());
220
221        for (id, client_room) in &room.client_rooms {
222            if *id != identity {
223                let _ = client_room
224                    .0
225                    .lock()
226                    .updates_tx
227                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
228                    .unwrap();
229            }
230        }
231
232        Ok(sid)
233    }
234
235    async fn publish_audio_track(
236        &self,
237        token: String,
238        _local_track: &LocalAudioTrack,
239    ) -> Result<Sid> {
240        self.executor.simulate_random_delay().await;
241        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
242        let identity = claims.sub.unwrap().to_string();
243        let room_name = claims.video.room.unwrap();
244
245        let mut server_rooms = self.rooms.lock();
246        let room = server_rooms
247            .get_mut(&*room_name)
248            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
249
250        let can_publish = room
251            .participant_permissions
252            .get(&identity)
253            .map(|permission| permission.can_publish)
254            .or(claims.video.can_publish)
255            .unwrap_or(true);
256
257        if !can_publish {
258            return Err(anyhow!("user is not allowed to publish"));
259        }
260
261        let sid = nanoid::nanoid!(17);
262        let track = Arc::new(RemoteAudioTrack {
263            sid: sid.clone(),
264            publisher_id: identity.clone(),
265            running: AtomicBool::new(true),
266        });
267
268        let publication = Arc::new(RemoteTrackPublication);
269
270        room.audio_tracks.push(track.clone());
271
272        for (id, client_room) in &room.client_rooms {
273            if *id != identity {
274                let _ = client_room
275                    .0
276                    .lock()
277                    .updates_tx
278                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
279                        track.clone(),
280                        publication.clone(),
281                    ))
282                    .unwrap();
283            }
284        }
285
286        Ok(sid)
287    }
288
289    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
290        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
291        let room_name = claims.video.room.unwrap();
292
293        let mut server_rooms = self.rooms.lock();
294        let room = server_rooms
295            .get_mut(&*room_name)
296            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
297        Ok(room.video_tracks.clone())
298    }
299
300    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
301        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
302        let room_name = claims.video.room.unwrap();
303
304        let mut server_rooms = self.rooms.lock();
305        let room = server_rooms
306            .get_mut(&*room_name)
307            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
308        Ok(room.audio_tracks.clone())
309    }
310}
311
312#[derive(Default)]
313struct TestServerRoom {
314    client_rooms: HashMap<Sid, Arc<Room>>,
315    video_tracks: Vec<Arc<RemoteVideoTrack>>,
316    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
317    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
318}
319
320impl TestServerRoom {}
321
322pub struct TestApiClient {
323    url: String,
324}
325
326#[async_trait]
327impl live_kit_server::api::Client for TestApiClient {
328    fn url(&self) -> &str {
329        &self.url
330    }
331
332    async fn create_room(&self, name: String) -> Result<()> {
333        let server = TestServer::get(&self.url)?;
334        server.create_room(name).await?;
335        Ok(())
336    }
337
338    async fn delete_room(&self, name: String) -> Result<()> {
339        let server = TestServer::get(&self.url)?;
340        server.delete_room(name).await?;
341        Ok(())
342    }
343
344    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
345        let server = TestServer::get(&self.url)?;
346        server.remove_participant(room, identity).await?;
347        Ok(())
348    }
349
350    async fn update_participant(
351        &self,
352        room: String,
353        identity: String,
354        permission: live_kit_server::proto::ParticipantPermission,
355    ) -> Result<()> {
356        let server = TestServer::get(&self.url)?;
357        server
358            .update_participant(room, identity, permission)
359            .await?;
360        Ok(())
361    }
362
363    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
364        let server = TestServer::get(&self.url)?;
365        token::create(
366            &server.api_key,
367            &server.secret_key,
368            Some(identity),
369            token::VideoGrant::to_join(room),
370        )
371    }
372
373    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
374        let server = TestServer::get(&self.url)?;
375        token::create(
376            &server.api_key,
377            &server.secret_key,
378            Some(identity),
379            token::VideoGrant::for_guest(room),
380        )
381    }
382}
383
384struct RoomState {
385    connection: (
386        watch::Sender<ConnectionState>,
387        watch::Receiver<ConnectionState>,
388    ),
389    display_sources: Vec<MacOSDisplay>,
390    updates_tx: async_broadcast::Sender<RoomUpdate>,
391    updates_rx: async_broadcast::Receiver<RoomUpdate>,
392}
393
394pub struct Room(Mutex<RoomState>);
395
396impl Room {
397    pub fn new() -> Arc<Self> {
398        let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
399        Arc::new(Self(Mutex::new(RoomState {
400            connection: watch::channel_with(ConnectionState::Disconnected),
401            display_sources: Default::default(),
402            updates_tx,
403            updates_rx,
404        })))
405    }
406
407    pub fn status(&self) -> watch::Receiver<ConnectionState> {
408        self.0.lock().connection.1.clone()
409    }
410
411    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
412        let this = self.clone();
413        let url = url.to_string();
414        let token = token.to_string();
415        async move {
416            let server = TestServer::get(&url)?;
417            server
418                .join_room(token.clone(), this.clone())
419                .await
420                .context("room join")?;
421            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
422            Ok(())
423        }
424    }
425
426    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
427        let this = self.clone();
428        async move {
429            let server = this.test_server();
430            server.executor.simulate_random_delay().await;
431            Ok(this.0.lock().display_sources.clone())
432        }
433    }
434
435    pub fn publish_video_track(
436        self: &Arc<Self>,
437        track: LocalVideoTrack,
438    ) -> impl Future<Output = Result<LocalTrackPublication>> {
439        let this = self.clone();
440        let track = track.clone();
441        async move {
442            let sid = this
443                .test_server()
444                .publish_video_track(this.token(), track)
445                .await?;
446            Ok(LocalTrackPublication {
447                muted: Default::default(),
448                sid,
449            })
450        }
451    }
452    pub fn publish_audio_track(
453        self: &Arc<Self>,
454        track: LocalAudioTrack,
455    ) -> impl Future<Output = Result<LocalTrackPublication>> {
456        let this = self.clone();
457        let track = track.clone();
458        async move {
459            let sid = this
460                .test_server()
461                .publish_audio_track(this.token(), &track)
462                .await?;
463            Ok(LocalTrackPublication {
464                muted: Default::default(),
465                sid,
466            })
467        }
468    }
469
470    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
471
472    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
473        if !self.is_connected() {
474            return Vec::new();
475        }
476
477        self.test_server()
478            .audio_tracks(self.token())
479            .unwrap()
480            .into_iter()
481            .filter(|track| track.publisher_id() == publisher_id)
482            .collect()
483    }
484
485    pub fn remote_audio_track_publications(
486        &self,
487        publisher_id: &str,
488    ) -> Vec<Arc<RemoteTrackPublication>> {
489        if !self.is_connected() {
490            return Vec::new();
491        }
492
493        self.test_server()
494            .audio_tracks(self.token())
495            .unwrap()
496            .into_iter()
497            .filter(|track| track.publisher_id() == publisher_id)
498            .map(|_track| Arc::new(RemoteTrackPublication {}))
499            .collect()
500    }
501
502    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
503        if !self.is_connected() {
504            return Vec::new();
505        }
506
507        self.test_server()
508            .video_tracks(self.token())
509            .unwrap()
510            .into_iter()
511            .filter(|track| track.publisher_id() == publisher_id)
512            .collect()
513    }
514
515    pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
516        self.0.lock().updates_rx.clone()
517    }
518
519    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
520        self.0.lock().display_sources = sources;
521    }
522
523    fn test_server(&self) -> Arc<TestServer> {
524        match self.0.lock().connection.1.borrow().clone() {
525            ConnectionState::Disconnected => panic!("must be connected to call this method"),
526            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
527        }
528    }
529
530    fn token(&self) -> String {
531        match self.0.lock().connection.1.borrow().clone() {
532            ConnectionState::Disconnected => panic!("must be connected to call this method"),
533            ConnectionState::Connected { token, .. } => token,
534        }
535    }
536
537    fn is_connected(&self) -> bool {
538        match *self.0.lock().connection.1.borrow() {
539            ConnectionState::Disconnected => false,
540            ConnectionState::Connected { .. } => true,
541        }
542    }
543}
544
545impl Drop for Room {
546    fn drop(&mut self) {
547        if let ConnectionState::Connected { token, .. } = mem::replace(
548            &mut *self.0.lock().connection.0.borrow_mut(),
549            ConnectionState::Disconnected,
550        ) {
551            if let Ok(server) = TestServer::get(&token) {
552                let executor = server.executor.clone();
553                executor
554                    .spawn(async move { server.leave_room(token).await.unwrap() })
555                    .detach();
556            }
557        }
558    }
559}
560
561#[derive(Clone)]
562pub struct LocalTrackPublication {
563    sid: String,
564    muted: Arc<AtomicBool>,
565}
566
567impl LocalTrackPublication {
568    pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
569        let muted = self.muted.clone();
570        async move {
571            muted.store(mute, SeqCst);
572            Ok(())
573        }
574    }
575
576    pub fn is_muted(&self) -> bool {
577        self.muted.load(SeqCst)
578    }
579
580    pub fn sid(&self) -> String {
581        self.sid.clone()
582    }
583}
584
585pub struct RemoteTrackPublication;
586
587impl RemoteTrackPublication {
588    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
589        async { Ok(()) }
590    }
591
592    pub fn is_muted(&self) -> bool {
593        false
594    }
595
596    pub fn sid(&self) -> String {
597        "".to_string()
598    }
599}
600
601#[derive(Clone)]
602pub struct LocalVideoTrack {
603    frames_rx: async_broadcast::Receiver<Frame>,
604}
605
606impl LocalVideoTrack {
607    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
608        Self {
609            frames_rx: display.frames.1.clone(),
610        }
611    }
612}
613
614#[derive(Clone)]
615pub struct LocalAudioTrack;
616
617impl LocalAudioTrack {
618    pub fn create() -> Self {
619        Self
620    }
621}
622
623#[derive(Debug)]
624pub struct RemoteVideoTrack {
625    sid: Sid,
626    publisher_id: Sid,
627    frames_rx: async_broadcast::Receiver<Frame>,
628}
629
630impl RemoteVideoTrack {
631    pub fn sid(&self) -> &str {
632        &self.sid
633    }
634
635    pub fn publisher_id(&self) -> &str {
636        &self.publisher_id
637    }
638
639    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
640        self.frames_rx.clone()
641    }
642}
643
644#[derive(Debug)]
645pub struct RemoteAudioTrack {
646    sid: Sid,
647    publisher_id: Sid,
648    running: AtomicBool,
649}
650
651impl RemoteAudioTrack {
652    pub fn sid(&self) -> &str {
653        &self.sid
654    }
655
656    pub fn publisher_id(&self) -> &str {
657        &self.publisher_id
658    }
659
660    pub fn start(&self) {
661        self.running.store(true, SeqCst);
662    }
663
664    pub fn stop(&self) {
665        self.running.store(false, SeqCst);
666    }
667}
668
669#[derive(Clone)]
670pub struct MacOSDisplay {
671    frames: (
672        async_broadcast::Sender<Frame>,
673        async_broadcast::Receiver<Frame>,
674    ),
675}
676
677impl MacOSDisplay {
678    pub fn new() -> Self {
679        Self {
680            frames: async_broadcast::broadcast(128),
681        }
682    }
683
684    pub fn send_frame(&self, frame: Frame) {
685        self.frames.0.try_broadcast(frame).unwrap();
686    }
687}
688
689#[derive(Clone, Debug, PartialEq, Eq)]
690pub struct Frame {
691    pub label: String,
692    pub width: usize,
693    pub height: usize,
694}
695
696impl Frame {
697    pub fn width(&self) -> usize {
698        self.width
699    }
700
701    pub fn height(&self) -> usize {
702        self.height
703    }
704
705    pub fn image(&self) -> CVImageBuffer {
706        unimplemented!("you can't call this in test mode")
707    }
708}