test.rs

  1use anyhow::{anyhow, Context, Result};
  2use async_trait::async_trait;
  3use collections::{BTreeMap, HashMap};
  4use futures::Stream;
  5use gpui::BackgroundExecutor;
  6use live_kit_server::token;
  7use media::core_video::CVImageBuffer;
  8use parking_lot::Mutex;
  9use postage::watch;
 10use std::{future::Future, mem, sync::Arc};
 11
 12static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 13
 14pub struct TestServer {
 15    pub url: String,
 16    pub api_key: String,
 17    pub secret_key: String,
 18    rooms: Mutex<HashMap<String, TestServerRoom>>,
 19    executor: BackgroundExecutor,
 20}
 21
 22impl TestServer {
 23    pub fn create(
 24        url: String,
 25        api_key: String,
 26        secret_key: String,
 27        executor: BackgroundExecutor,
 28    ) -> Result<Arc<TestServer>> {
 29        let mut servers = SERVERS.lock();
 30        if servers.contains_key(&url) {
 31            Err(anyhow!("a server with url {:?} already exists", url))
 32        } else {
 33            let server = Arc::new(TestServer {
 34                url: url.clone(),
 35                api_key,
 36                secret_key,
 37                rooms: Default::default(),
 38                executor,
 39            });
 40            servers.insert(url, server.clone());
 41            Ok(server)
 42        }
 43    }
 44
 45    fn get(url: &str) -> Result<Arc<TestServer>> {
 46        Ok(SERVERS
 47            .lock()
 48            .get(url)
 49            .ok_or_else(|| anyhow!("no server found for url"))?
 50            .clone())
 51    }
 52
 53    pub fn teardown(&self) -> Result<()> {
 54        SERVERS
 55            .lock()
 56            .remove(&self.url)
 57            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 58        Ok(())
 59    }
 60
 61    pub fn create_api_client(&self) -> TestApiClient {
 62        TestApiClient {
 63            url: self.url.clone(),
 64        }
 65    }
 66
 67    pub async fn create_room(&self, room: String) -> Result<()> {
 68        self.executor.simulate_random_delay().await;
 69        let mut server_rooms = self.rooms.lock();
 70        if server_rooms.contains_key(&room) {
 71            Err(anyhow!("room {:?} already exists", room))
 72        } else {
 73            server_rooms.insert(room, Default::default());
 74            Ok(())
 75        }
 76    }
 77
 78    async fn delete_room(&self, room: String) -> Result<()> {
 79        // TODO: clear state associated with all `Room`s.
 80        self.executor.simulate_random_delay().await;
 81        let mut server_rooms = self.rooms.lock();
 82        server_rooms
 83            .remove(&room)
 84            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 85        Ok(())
 86    }
 87
 88    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 89        self.executor.simulate_random_delay().await;
 90        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 91        let identity = claims.sub.unwrap().to_string();
 92        let room_name = claims.video.room.unwrap();
 93        let mut server_rooms = self.rooms.lock();
 94        let room = (*server_rooms).entry(room_name.to_string()).or_default();
 95
 96        if room.client_rooms.contains_key(&identity) {
 97            Err(anyhow!(
 98                "{:?} attempted to join room {:?} twice",
 99                identity,
100                room_name
101            ))
102        } else {
103            for track in &room.video_tracks {
104                client_room
105                    .0
106                    .lock()
107                    .video_track_updates
108                    .0
109                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
110                    .unwrap();
111            }
112            room.client_rooms.insert(identity, client_room);
113            Ok(())
114        }
115    }
116
117    async fn leave_room(&self, token: String) -> Result<()> {
118        self.executor.simulate_random_delay().await;
119        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
120        let identity = claims.sub.unwrap().to_string();
121        let room_name = claims.video.room.unwrap();
122        let mut server_rooms = self.rooms.lock();
123        let room = server_rooms
124            .get_mut(&*room_name)
125            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
126        room.client_rooms.remove(&identity).ok_or_else(|| {
127            anyhow!(
128                "{:?} attempted to leave room {:?} before joining it",
129                identity,
130                room_name
131            )
132        })?;
133        Ok(())
134    }
135
136    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
137        // TODO: clear state associated with the `Room`.
138
139        self.executor.simulate_random_delay().await;
140        let mut server_rooms = self.rooms.lock();
141        let room = server_rooms
142            .get_mut(&room_name)
143            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
144        room.client_rooms.remove(&identity).ok_or_else(|| {
145            anyhow!(
146                "participant {:?} did not join room {:?}",
147                identity,
148                room_name
149            )
150        })?;
151        Ok(())
152    }
153
154    pub async fn disconnect_client(&self, client_identity: String) {
155        self.executor.simulate_random_delay().await;
156        let mut server_rooms = self.rooms.lock();
157        for room in server_rooms.values_mut() {
158            if let Some(room) = room.client_rooms.remove(&client_identity) {
159                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
160            }
161        }
162    }
163
164    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
165        self.executor.simulate_random_delay().await;
166        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
167        let identity = claims.sub.unwrap().to_string();
168        let room_name = claims.video.room.unwrap();
169
170        let mut server_rooms = self.rooms.lock();
171        let room = server_rooms
172            .get_mut(&*room_name)
173            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
174
175        let track = Arc::new(RemoteVideoTrack {
176            sid: nanoid::nanoid!(17),
177            publisher_id: identity.clone(),
178            frames_rx: local_track.frames_rx.clone(),
179        });
180
181        room.video_tracks.push(track.clone());
182
183        for (id, client_room) in &room.client_rooms {
184            if *id != identity {
185                let _ = client_room
186                    .0
187                    .lock()
188                    .video_track_updates
189                    .0
190                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
191                    .unwrap();
192            }
193        }
194
195        Ok(())
196    }
197
198    async fn publish_audio_track(
199        &self,
200        token: String,
201        _local_track: &LocalAudioTrack,
202    ) -> Result<()> {
203        self.executor.simulate_random_delay().await;
204        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
205        let identity = claims.sub.unwrap().to_string();
206        let room_name = claims.video.room.unwrap();
207
208        let mut server_rooms = self.rooms.lock();
209        let room = server_rooms
210            .get_mut(&*room_name)
211            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
212
213        let track = Arc::new(RemoteAudioTrack {
214            sid: nanoid::nanoid!(17),
215            publisher_id: identity.clone(),
216        });
217
218        let publication = Arc::new(RemoteTrackPublication);
219
220        room.audio_tracks.push(track.clone());
221
222        for (id, client_room) in &room.client_rooms {
223            if *id != identity {
224                let _ = client_room
225                    .0
226                    .lock()
227                    .audio_track_updates
228                    .0
229                    .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
230                        track.clone(),
231                        publication.clone(),
232                    ))
233                    .unwrap();
234            }
235        }
236
237        Ok(())
238    }
239
240    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
241        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
242        let room_name = claims.video.room.unwrap();
243
244        let mut server_rooms = self.rooms.lock();
245        let room = server_rooms
246            .get_mut(&*room_name)
247            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
248        Ok(room.video_tracks.clone())
249    }
250
251    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
252        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
253        let room_name = claims.video.room.unwrap();
254
255        let mut server_rooms = self.rooms.lock();
256        let room = server_rooms
257            .get_mut(&*room_name)
258            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
259        Ok(room.audio_tracks.clone())
260    }
261}
262
263#[derive(Default)]
264struct TestServerRoom {
265    client_rooms: HashMap<Sid, Arc<Room>>,
266    video_tracks: Vec<Arc<RemoteVideoTrack>>,
267    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
268}
269
270impl TestServerRoom {}
271
272pub struct TestApiClient {
273    url: String,
274}
275
276#[async_trait]
277impl live_kit_server::api::Client for TestApiClient {
278    fn url(&self) -> &str {
279        &self.url
280    }
281
282    async fn create_room(&self, name: String) -> Result<()> {
283        let server = TestServer::get(&self.url)?;
284        server.create_room(name).await?;
285        Ok(())
286    }
287
288    async fn delete_room(&self, name: String) -> Result<()> {
289        let server = TestServer::get(&self.url)?;
290        server.delete_room(name).await?;
291        Ok(())
292    }
293
294    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
295        let server = TestServer::get(&self.url)?;
296        server.remove_participant(room, identity).await?;
297        Ok(())
298    }
299
300    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
301        let server = TestServer::get(&self.url)?;
302        token::create(
303            &server.api_key,
304            &server.secret_key,
305            Some(identity),
306            token::VideoGrant::to_join(room),
307        )
308    }
309
310    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
311        let server = TestServer::get(&self.url)?;
312        token::create(
313            &server.api_key,
314            &server.secret_key,
315            Some(identity),
316            token::VideoGrant::for_guest(room),
317        )
318    }
319}
320
321pub type Sid = String;
322
323struct RoomState {
324    connection: (
325        watch::Sender<ConnectionState>,
326        watch::Receiver<ConnectionState>,
327    ),
328    display_sources: Vec<MacOSDisplay>,
329    audio_track_updates: (
330        async_broadcast::Sender<RemoteAudioTrackUpdate>,
331        async_broadcast::Receiver<RemoteAudioTrackUpdate>,
332    ),
333    video_track_updates: (
334        async_broadcast::Sender<RemoteVideoTrackUpdate>,
335        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
336    ),
337}
338
339#[derive(Clone, Eq, PartialEq)]
340pub enum ConnectionState {
341    Disconnected,
342    Connected { url: String, token: String },
343}
344
345pub struct Room(Mutex<RoomState>);
346
347impl Room {
348    pub fn new() -> Arc<Self> {
349        Arc::new(Self(Mutex::new(RoomState {
350            connection: watch::channel_with(ConnectionState::Disconnected),
351            display_sources: Default::default(),
352            video_track_updates: async_broadcast::broadcast(128),
353            audio_track_updates: async_broadcast::broadcast(128),
354        })))
355    }
356
357    pub fn status(&self) -> watch::Receiver<ConnectionState> {
358        self.0.lock().connection.1.clone()
359    }
360
361    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
362        let this = self.clone();
363        let url = url.to_string();
364        let token = token.to_string();
365        async move {
366            let server = TestServer::get(&url)?;
367            server
368                .join_room(token.clone(), this.clone())
369                .await
370                .context("room join")?;
371            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
372            Ok(())
373        }
374    }
375
376    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
377        let this = self.clone();
378        async move {
379            let server = this.test_server();
380            server.executor.simulate_random_delay().await;
381            Ok(this.0.lock().display_sources.clone())
382        }
383    }
384
385    pub fn publish_video_track(
386        self: &Arc<Self>,
387        track: LocalVideoTrack,
388    ) -> impl Future<Output = Result<LocalTrackPublication>> {
389        let this = self.clone();
390        let track = track.clone();
391        async move {
392            this.test_server()
393                .publish_video_track(this.token(), track)
394                .await?;
395            Ok(LocalTrackPublication)
396        }
397    }
398    pub fn publish_audio_track(
399        self: &Arc<Self>,
400        track: LocalAudioTrack,
401    ) -> impl Future<Output = Result<LocalTrackPublication>> {
402        let this = self.clone();
403        let track = track.clone();
404        async move {
405            this.test_server()
406                .publish_audio_track(this.token(), &track)
407                .await?;
408            Ok(LocalTrackPublication)
409        }
410    }
411
412    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
413
414    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
415        if !self.is_connected() {
416            return Vec::new();
417        }
418
419        self.test_server()
420            .audio_tracks(self.token())
421            .unwrap()
422            .into_iter()
423            .filter(|track| track.publisher_id() == publisher_id)
424            .collect()
425    }
426
427    pub fn remote_audio_track_publications(
428        &self,
429        publisher_id: &str,
430    ) -> Vec<Arc<RemoteTrackPublication>> {
431        if !self.is_connected() {
432            return Vec::new();
433        }
434
435        self.test_server()
436            .audio_tracks(self.token())
437            .unwrap()
438            .into_iter()
439            .filter(|track| track.publisher_id() == publisher_id)
440            .map(|_track| Arc::new(RemoteTrackPublication {}))
441            .collect()
442    }
443
444    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
445        if !self.is_connected() {
446            return Vec::new();
447        }
448
449        self.test_server()
450            .video_tracks(self.token())
451            .unwrap()
452            .into_iter()
453            .filter(|track| track.publisher_id() == publisher_id)
454            .collect()
455    }
456
457    pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
458        self.0.lock().audio_track_updates.1.clone()
459    }
460
461    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
462        self.0.lock().video_track_updates.1.clone()
463    }
464
465    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
466        self.0.lock().display_sources = sources;
467    }
468
469    fn test_server(&self) -> Arc<TestServer> {
470        match self.0.lock().connection.1.borrow().clone() {
471            ConnectionState::Disconnected => panic!("must be connected to call this method"),
472            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
473        }
474    }
475
476    fn token(&self) -> String {
477        match self.0.lock().connection.1.borrow().clone() {
478            ConnectionState::Disconnected => panic!("must be connected to call this method"),
479            ConnectionState::Connected { token, .. } => token,
480        }
481    }
482
483    fn is_connected(&self) -> bool {
484        match *self.0.lock().connection.1.borrow() {
485            ConnectionState::Disconnected => false,
486            ConnectionState::Connected { .. } => true,
487        }
488    }
489}
490
491impl Drop for Room {
492    fn drop(&mut self) {
493        if let ConnectionState::Connected { token, .. } = mem::replace(
494            &mut *self.0.lock().connection.0.borrow_mut(),
495            ConnectionState::Disconnected,
496        ) {
497            if let Ok(server) = TestServer::get(&token) {
498                let executor = server.executor.clone();
499                executor
500                    .spawn(async move { server.leave_room(token).await.unwrap() })
501                    .detach();
502            }
503        }
504    }
505}
506
507pub struct LocalTrackPublication;
508
509impl LocalTrackPublication {
510    pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
511        async { Ok(()) }
512    }
513}
514
515pub struct RemoteTrackPublication;
516
517impl RemoteTrackPublication {
518    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
519        async { Ok(()) }
520    }
521
522    pub fn is_muted(&self) -> bool {
523        false
524    }
525
526    pub fn sid(&self) -> String {
527        "".to_string()
528    }
529}
530
531#[derive(Clone)]
532pub struct LocalVideoTrack {
533    frames_rx: async_broadcast::Receiver<Frame>,
534}
535
536impl LocalVideoTrack {
537    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
538        Self {
539            frames_rx: display.frames.1.clone(),
540        }
541    }
542}
543
544#[derive(Clone)]
545pub struct LocalAudioTrack;
546
547impl LocalAudioTrack {
548    pub fn create() -> Self {
549        Self
550    }
551}
552
553#[derive(Debug)]
554pub struct RemoteVideoTrack {
555    sid: Sid,
556    publisher_id: Sid,
557    frames_rx: async_broadcast::Receiver<Frame>,
558}
559
560impl RemoteVideoTrack {
561    pub fn sid(&self) -> &str {
562        &self.sid
563    }
564
565    pub fn publisher_id(&self) -> &str {
566        &self.publisher_id
567    }
568
569    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
570        self.frames_rx.clone()
571    }
572}
573
574#[derive(Debug)]
575pub struct RemoteAudioTrack {
576    sid: Sid,
577    publisher_id: Sid,
578}
579
580impl RemoteAudioTrack {
581    pub fn sid(&self) -> &str {
582        &self.sid
583    }
584
585    pub fn publisher_id(&self) -> &str {
586        &self.publisher_id
587    }
588
589    pub fn enable(&self) -> impl Future<Output = Result<()>> {
590        async { Ok(()) }
591    }
592
593    pub fn disable(&self) -> impl Future<Output = Result<()>> {
594        async { Ok(()) }
595    }
596}
597
598#[derive(Clone)]
599pub enum RemoteVideoTrackUpdate {
600    Subscribed(Arc<RemoteVideoTrack>),
601    Unsubscribed { publisher_id: Sid, track_id: Sid },
602}
603
604#[derive(Clone)]
605pub enum RemoteAudioTrackUpdate {
606    ActiveSpeakersChanged { speakers: Vec<Sid> },
607    MuteChanged { track_id: Sid, muted: bool },
608    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
609    Unsubscribed { publisher_id: Sid, track_id: Sid },
610}
611
612#[derive(Clone)]
613pub struct MacOSDisplay {
614    frames: (
615        async_broadcast::Sender<Frame>,
616        async_broadcast::Receiver<Frame>,
617    ),
618}
619
620impl MacOSDisplay {
621    pub fn new() -> Self {
622        Self {
623            frames: async_broadcast::broadcast(128),
624        }
625    }
626
627    pub fn send_frame(&self, frame: Frame) {
628        self.frames.0.try_broadcast(frame).unwrap();
629    }
630}
631
632#[derive(Clone, Debug, PartialEq, Eq)]
633pub struct Frame {
634    pub label: String,
635    pub width: usize,
636    pub height: usize,
637}
638
639impl Frame {
640    pub fn width(&self) -> usize {
641        self.width
642    }
643
644    pub fn height(&self) -> usize {
645        self.height
646    }
647
648    pub fn image(&self) -> CVImageBuffer {
649        unimplemented!("you can't call this in test mode")
650    }
651}