test.rs

  1use anyhow::{anyhow, Context, Result};
  2use async_trait::async_trait;
  3use collections::{BTreeMap, HashMap};
  4use futures::Stream;
  5use gpui::BackgroundExecutor;
  6use live_kit_server::{proto, token};
  7use media::core_video::CVImageBuffer;
  8use parking_lot::Mutex;
  9use postage::watch;
 10use std::{future::Future, mem, sync::Arc};
 11
 12static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 13
 14pub struct TestServer {
 15    pub url: String,
 16    pub api_key: String,
 17    pub secret_key: String,
 18    rooms: Mutex<HashMap<String, TestServerRoom>>,
 19    executor: BackgroundExecutor,
 20}
 21
 22impl TestServer {
 23    pub fn create(
 24        url: String,
 25        api_key: String,
 26        secret_key: String,
 27        executor: BackgroundExecutor,
 28    ) -> Result<Arc<TestServer>> {
 29        let mut servers = SERVERS.lock();
 30        if servers.contains_key(&url) {
 31            Err(anyhow!("a server with url {:?} already exists", url))
 32        } else {
 33            let server = Arc::new(TestServer {
 34                url: url.clone(),
 35                api_key,
 36                secret_key,
 37                rooms: Default::default(),
 38                executor,
 39            });
 40            servers.insert(url, server.clone());
 41            Ok(server)
 42        }
 43    }
 44
 45    fn get(url: &str) -> Result<Arc<TestServer>> {
 46        Ok(SERVERS
 47            .lock()
 48            .get(url)
 49            .ok_or_else(|| anyhow!("no server found for url"))?
 50            .clone())
 51    }
 52
 53    pub fn teardown(&self) -> Result<()> {
 54        SERVERS
 55            .lock()
 56            .remove(&self.url)
 57            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 58        Ok(())
 59    }
 60
 61    pub fn create_api_client(&self) -> TestApiClient {
 62        TestApiClient {
 63            url: self.url.clone(),
 64        }
 65    }
 66
 67    pub async fn create_room(&self, room: String) -> Result<()> {
 68        self.executor.simulate_random_delay().await;
 69        let mut server_rooms = self.rooms.lock();
 70        if server_rooms.contains_key(&room) {
 71            Err(anyhow!("room {:?} already exists", room))
 72        } else {
 73            server_rooms.insert(room, Default::default());
 74            Ok(())
 75        }
 76    }
 77
 78    async fn delete_room(&self, room: String) -> Result<()> {
 79        // TODO: clear state associated with all `Room`s.
 80        self.executor.simulate_random_delay().await;
 81        let mut server_rooms = self.rooms.lock();
 82        server_rooms
 83            .remove(&room)
 84            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 85        Ok(())
 86    }
 87
 88    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 89        self.executor.simulate_random_delay().await;
 90        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 91        let identity = claims.sub.unwrap().to_string();
 92        let room_name = claims.video.room.unwrap();
 93        let mut server_rooms = self.rooms.lock();
 94        let room = (*server_rooms).entry(room_name.to_string()).or_default();
 95
 96        if room.client_rooms.contains_key(&identity) {
 97            Err(anyhow!(
 98                "{:?} attempted to join room {:?} twice",
 99                identity,
100                room_name
101            ))
102        } else {
103            for track in &room.video_tracks {
104                client_room
105                    .0
106                    .lock()
107                    .video_track_updates
108                    .0
109                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
110                    .unwrap();
111            }
112            room.client_rooms.insert(identity, client_room);
113            Ok(())
114        }
115    }
116
117    async fn leave_room(&self, token: String) -> Result<()> {
118        self.executor.simulate_random_delay().await;
119        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
120        let identity = claims.sub.unwrap().to_string();
121        let room_name = claims.video.room.unwrap();
122        let mut server_rooms = self.rooms.lock();
123        let room = server_rooms
124            .get_mut(&*room_name)
125            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
126        room.client_rooms.remove(&identity).ok_or_else(|| {
127            anyhow!(
128                "{:?} attempted to leave room {:?} before joining it",
129                identity,
130                room_name
131            )
132        })?;
133        Ok(())
134    }
135
136    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
137        // TODO: clear state associated with the `Room`.
138
139        self.executor.simulate_random_delay().await;
140        let mut server_rooms = self.rooms.lock();
141        let room = server_rooms
142            .get_mut(&room_name)
143            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
144        room.client_rooms.remove(&identity).ok_or_else(|| {
145            anyhow!(
146                "participant {:?} did not join room {:?}",
147                identity,
148                room_name
149            )
150        })?;
151        Ok(())
152    }
153
154    async fn update_participant(
155        &self,
156        room_name: String,
157        identity: String,
158        permission: proto::ParticipantPermission,
159    ) -> Result<()> {
160        self.executor.simulate_random_delay().await;
161        let mut server_rooms = self.rooms.lock();
162        let room = server_rooms
163            .get_mut(&room_name)
164            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
165        room.participant_permissions.insert(identity, permission);
166        Ok(())
167    }
168
169    pub async fn disconnect_client(&self, client_identity: String) {
170        self.executor.simulate_random_delay().await;
171        let mut server_rooms = self.rooms.lock();
172        for room in server_rooms.values_mut() {
173            if let Some(room) = room.client_rooms.remove(&client_identity) {
174                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
175            }
176        }
177    }
178
179    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
180        self.executor.simulate_random_delay().await;
181        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
182        let identity = claims.sub.unwrap().to_string();
183        let room_name = claims.video.room.unwrap();
184
185        let mut server_rooms = self.rooms.lock();
186        let room = server_rooms
187            .get_mut(&*room_name)
188            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
189
190        let can_publish = room
191            .participant_permissions
192            .get(&identity)
193            .map(|permission| permission.can_publish)
194            .or(claims.video.can_publish)
195            .unwrap_or(true);
196
197        if !can_publish {
198            return Err(anyhow!("user is not allowed to publish"));
199        }
200
201        let track = Arc::new(RemoteVideoTrack {
202            sid: nanoid::nanoid!(17),
203            publisher_id: identity.clone(),
204            frames_rx: local_track.frames_rx.clone(),
205        });
206
207        room.video_tracks.push(track.clone());
208
209        for (id, client_room) in &room.client_rooms {
210            if *id != identity {
211                let _ = client_room
212                    .0
213                    .lock()
214                    .video_track_updates
215                    .0
216                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
217                    .unwrap();
218            }
219        }
220
221        Ok(())
222    }
223
224    async fn publish_audio_track(
225        &self,
226        token: String,
227        _local_track: &LocalAudioTrack,
228    ) -> Result<()> {
229        self.executor.simulate_random_delay().await;
230        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
231        let identity = claims.sub.unwrap().to_string();
232        let room_name = claims.video.room.unwrap();
233
234        let mut server_rooms = self.rooms.lock();
235        let room = server_rooms
236            .get_mut(&*room_name)
237            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
238
239        let can_publish = room
240            .participant_permissions
241            .get(&identity)
242            .map(|permission| permission.can_publish)
243            .or(claims.video.can_publish)
244            .unwrap_or(true);
245
246        if !can_publish {
247            return Err(anyhow!("user is not allowed to publish"));
248        }
249
250        let track = Arc::new(RemoteAudioTrack {
251            sid: nanoid::nanoid!(17),
252            publisher_id: identity.clone(),
253        });
254
255        let publication = Arc::new(RemoteTrackPublication);
256
257        room.audio_tracks.push(track.clone());
258
259        for (id, client_room) in &room.client_rooms {
260            if *id != identity {
261                let _ = client_room
262                    .0
263                    .lock()
264                    .audio_track_updates
265                    .0
266                    .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
267                        track.clone(),
268                        publication.clone(),
269                    ))
270                    .unwrap();
271            }
272        }
273
274        Ok(())
275    }
276
277    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
278        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
279        let room_name = claims.video.room.unwrap();
280
281        let mut server_rooms = self.rooms.lock();
282        let room = server_rooms
283            .get_mut(&*room_name)
284            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
285        Ok(room.video_tracks.clone())
286    }
287
288    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
289        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
290        let room_name = claims.video.room.unwrap();
291
292        let mut server_rooms = self.rooms.lock();
293        let room = server_rooms
294            .get_mut(&*room_name)
295            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
296        Ok(room.audio_tracks.clone())
297    }
298}
299
300#[derive(Default)]
301struct TestServerRoom {
302    client_rooms: HashMap<Sid, Arc<Room>>,
303    video_tracks: Vec<Arc<RemoteVideoTrack>>,
304    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
305    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
306}
307
308impl TestServerRoom {}
309
310pub struct TestApiClient {
311    url: String,
312}
313
314#[async_trait]
315impl live_kit_server::api::Client for TestApiClient {
316    fn url(&self) -> &str {
317        &self.url
318    }
319
320    async fn create_room(&self, name: String) -> Result<()> {
321        let server = TestServer::get(&self.url)?;
322        server.create_room(name).await?;
323        Ok(())
324    }
325
326    async fn delete_room(&self, name: String) -> Result<()> {
327        let server = TestServer::get(&self.url)?;
328        server.delete_room(name).await?;
329        Ok(())
330    }
331
332    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
333        let server = TestServer::get(&self.url)?;
334        server.remove_participant(room, identity).await?;
335        Ok(())
336    }
337
338    async fn update_participant(
339        &self,
340        room: String,
341        identity: String,
342        permission: live_kit_server::proto::ParticipantPermission,
343    ) -> Result<()> {
344        let server = TestServer::get(&self.url)?;
345        server
346            .update_participant(room, identity, permission)
347            .await?;
348        Ok(())
349    }
350
351    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
352        let server = TestServer::get(&self.url)?;
353        token::create(
354            &server.api_key,
355            &server.secret_key,
356            Some(identity),
357            token::VideoGrant::to_join(room),
358        )
359    }
360
361    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
362        let server = TestServer::get(&self.url)?;
363        token::create(
364            &server.api_key,
365            &server.secret_key,
366            Some(identity),
367            token::VideoGrant::for_guest(room),
368        )
369    }
370}
371
372pub type Sid = String;
373
374struct RoomState {
375    connection: (
376        watch::Sender<ConnectionState>,
377        watch::Receiver<ConnectionState>,
378    ),
379    display_sources: Vec<MacOSDisplay>,
380    audio_track_updates: (
381        async_broadcast::Sender<RemoteAudioTrackUpdate>,
382        async_broadcast::Receiver<RemoteAudioTrackUpdate>,
383    ),
384    video_track_updates: (
385        async_broadcast::Sender<RemoteVideoTrackUpdate>,
386        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
387    ),
388}
389
390#[derive(Clone, Eq, PartialEq)]
391pub enum ConnectionState {
392    Disconnected,
393    Connected { url: String, token: String },
394}
395
396pub struct Room(Mutex<RoomState>);
397
398impl Room {
399    pub fn new() -> Arc<Self> {
400        Arc::new(Self(Mutex::new(RoomState {
401            connection: watch::channel_with(ConnectionState::Disconnected),
402            display_sources: Default::default(),
403            video_track_updates: async_broadcast::broadcast(128),
404            audio_track_updates: async_broadcast::broadcast(128),
405        })))
406    }
407
408    pub fn status(&self) -> watch::Receiver<ConnectionState> {
409        self.0.lock().connection.1.clone()
410    }
411
412    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
413        let this = self.clone();
414        let url = url.to_string();
415        let token = token.to_string();
416        async move {
417            let server = TestServer::get(&url)?;
418            server
419                .join_room(token.clone(), this.clone())
420                .await
421                .context("room join")?;
422            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
423            Ok(())
424        }
425    }
426
427    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
428        let this = self.clone();
429        async move {
430            let server = this.test_server();
431            server.executor.simulate_random_delay().await;
432            Ok(this.0.lock().display_sources.clone())
433        }
434    }
435
436    pub fn publish_video_track(
437        self: &Arc<Self>,
438        track: LocalVideoTrack,
439    ) -> impl Future<Output = Result<LocalTrackPublication>> {
440        let this = self.clone();
441        let track = track.clone();
442        async move {
443            this.test_server()
444                .publish_video_track(this.token(), track)
445                .await?;
446            Ok(LocalTrackPublication)
447        }
448    }
449    pub fn publish_audio_track(
450        self: &Arc<Self>,
451        track: LocalAudioTrack,
452    ) -> impl Future<Output = Result<LocalTrackPublication>> {
453        let this = self.clone();
454        let track = track.clone();
455        async move {
456            this.test_server()
457                .publish_audio_track(this.token(), &track)
458                .await?;
459            Ok(LocalTrackPublication)
460        }
461    }
462
463    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
464
465    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
466        if !self.is_connected() {
467            return Vec::new();
468        }
469
470        self.test_server()
471            .audio_tracks(self.token())
472            .unwrap()
473            .into_iter()
474            .filter(|track| track.publisher_id() == publisher_id)
475            .collect()
476    }
477
478    pub fn remote_audio_track_publications(
479        &self,
480        publisher_id: &str,
481    ) -> Vec<Arc<RemoteTrackPublication>> {
482        if !self.is_connected() {
483            return Vec::new();
484        }
485
486        self.test_server()
487            .audio_tracks(self.token())
488            .unwrap()
489            .into_iter()
490            .filter(|track| track.publisher_id() == publisher_id)
491            .map(|_track| Arc::new(RemoteTrackPublication {}))
492            .collect()
493    }
494
495    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
496        if !self.is_connected() {
497            return Vec::new();
498        }
499
500        self.test_server()
501            .video_tracks(self.token())
502            .unwrap()
503            .into_iter()
504            .filter(|track| track.publisher_id() == publisher_id)
505            .collect()
506    }
507
508    pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
509        self.0.lock().audio_track_updates.1.clone()
510    }
511
512    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
513        self.0.lock().video_track_updates.1.clone()
514    }
515
516    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
517        self.0.lock().display_sources = sources;
518    }
519
520    fn test_server(&self) -> Arc<TestServer> {
521        match self.0.lock().connection.1.borrow().clone() {
522            ConnectionState::Disconnected => panic!("must be connected to call this method"),
523            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
524        }
525    }
526
527    fn token(&self) -> String {
528        match self.0.lock().connection.1.borrow().clone() {
529            ConnectionState::Disconnected => panic!("must be connected to call this method"),
530            ConnectionState::Connected { token, .. } => token,
531        }
532    }
533
534    fn is_connected(&self) -> bool {
535        match *self.0.lock().connection.1.borrow() {
536            ConnectionState::Disconnected => false,
537            ConnectionState::Connected { .. } => true,
538        }
539    }
540}
541
542impl Drop for Room {
543    fn drop(&mut self) {
544        if let ConnectionState::Connected { token, .. } = mem::replace(
545            &mut *self.0.lock().connection.0.borrow_mut(),
546            ConnectionState::Disconnected,
547        ) {
548            if let Ok(server) = TestServer::get(&token) {
549                let executor = server.executor.clone();
550                executor
551                    .spawn(async move { server.leave_room(token).await.unwrap() })
552                    .detach();
553            }
554        }
555    }
556}
557
558pub struct LocalTrackPublication;
559
560impl LocalTrackPublication {
561    pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
562        async { Ok(()) }
563    }
564}
565
566pub struct RemoteTrackPublication;
567
568impl RemoteTrackPublication {
569    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
570        async { Ok(()) }
571    }
572
573    pub fn is_muted(&self) -> bool {
574        false
575    }
576
577    pub fn sid(&self) -> String {
578        "".to_string()
579    }
580}
581
582#[derive(Clone)]
583pub struct LocalVideoTrack {
584    frames_rx: async_broadcast::Receiver<Frame>,
585}
586
587impl LocalVideoTrack {
588    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
589        Self {
590            frames_rx: display.frames.1.clone(),
591        }
592    }
593}
594
595#[derive(Clone)]
596pub struct LocalAudioTrack;
597
598impl LocalAudioTrack {
599    pub fn create() -> Self {
600        Self
601    }
602}
603
604#[derive(Debug)]
605pub struct RemoteVideoTrack {
606    sid: Sid,
607    publisher_id: Sid,
608    frames_rx: async_broadcast::Receiver<Frame>,
609}
610
611impl RemoteVideoTrack {
612    pub fn sid(&self) -> &str {
613        &self.sid
614    }
615
616    pub fn publisher_id(&self) -> &str {
617        &self.publisher_id
618    }
619
620    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
621        self.frames_rx.clone()
622    }
623}
624
625#[derive(Debug)]
626pub struct RemoteAudioTrack {
627    sid: Sid,
628    publisher_id: Sid,
629}
630
631impl RemoteAudioTrack {
632    pub fn sid(&self) -> &str {
633        &self.sid
634    }
635
636    pub fn publisher_id(&self) -> &str {
637        &self.publisher_id
638    }
639
640    pub fn enable(&self) -> impl Future<Output = Result<()>> {
641        async { Ok(()) }
642    }
643
644    pub fn disable(&self) -> impl Future<Output = Result<()>> {
645        async { Ok(()) }
646    }
647}
648
649#[derive(Clone)]
650pub enum RemoteVideoTrackUpdate {
651    Subscribed(Arc<RemoteVideoTrack>),
652    Unsubscribed { publisher_id: Sid, track_id: Sid },
653}
654
655#[derive(Clone)]
656pub enum RemoteAudioTrackUpdate {
657    ActiveSpeakersChanged { speakers: Vec<Sid> },
658    MuteChanged { track_id: Sid, muted: bool },
659    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
660    Unsubscribed { publisher_id: Sid, track_id: Sid },
661}
662
663#[derive(Clone)]
664pub struct MacOSDisplay {
665    frames: (
666        async_broadcast::Sender<Frame>,
667        async_broadcast::Receiver<Frame>,
668    ),
669}
670
671impl MacOSDisplay {
672    pub fn new() -> Self {
673        Self {
674            frames: async_broadcast::broadcast(128),
675        }
676    }
677
678    pub fn send_frame(&self, frame: Frame) {
679        self.frames.0.try_broadcast(frame).unwrap();
680    }
681}
682
683#[derive(Clone, Debug, PartialEq, Eq)]
684pub struct Frame {
685    pub label: String,
686    pub width: usize,
687    pub height: usize,
688}
689
690impl Frame {
691    pub fn width(&self) -> usize {
692        self.width
693    }
694
695    pub fn height(&self) -> usize {
696        self.height
697    }
698
699    pub fn image(&self) -> CVImageBuffer {
700        unimplemented!("you can't call this in test mode")
701    }
702}