test.rs

  1use crate::{ConnectionState, RoomUpdate, Sid};
  2use anyhow::{anyhow, Context as _, Result};
  3use async_trait::async_trait;
  4use collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap, HashSet};
  5use futures::Stream;
  6use gpui::{BackgroundExecutor, SurfaceSource};
  7use livekit_api::{proto, token};
  8
  9use parking_lot::Mutex;
 10use postage::watch;
 11use std::{
 12    future::Future,
 13    mem,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        Arc, Weak,
 17    },
 18};
 19
 20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 21
 22pub struct TestServer {
 23    pub url: String,
 24    pub api_key: String,
 25    pub secret_key: String,
 26    rooms: Mutex<HashMap<String, TestServerRoom>>,
 27    executor: BackgroundExecutor,
 28}
 29
 30impl TestServer {
 31    pub fn create(
 32        url: String,
 33        api_key: String,
 34        secret_key: String,
 35        executor: BackgroundExecutor,
 36    ) -> Result<Arc<TestServer>> {
 37        let mut servers = SERVERS.lock();
 38        if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
 39            let server = Arc::new(TestServer {
 40                url,
 41                api_key,
 42                secret_key,
 43                rooms: Default::default(),
 44                executor,
 45            });
 46            e.insert(server.clone());
 47            Ok(server)
 48        } else {
 49            Err(anyhow!("a server with url {:?} already exists", url))
 50        }
 51    }
 52
 53    fn get(url: &str) -> Result<Arc<TestServer>> {
 54        Ok(SERVERS
 55            .lock()
 56            .get(url)
 57            .ok_or_else(|| anyhow!("no server found for url"))?
 58            .clone())
 59    }
 60
 61    pub fn teardown(&self) -> Result<()> {
 62        SERVERS
 63            .lock()
 64            .remove(&self.url)
 65            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 66        Ok(())
 67    }
 68
 69    pub fn create_api_client(&self) -> TestApiClient {
 70        TestApiClient {
 71            url: self.url.clone(),
 72        }
 73    }
 74
 75    pub async fn create_room(&self, room: String) -> Result<()> {
 76        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
 77        #[cfg(any(test, feature = "test-support"))]
 78        self.executor.simulate_random_delay().await;
 79        let mut server_rooms = self.rooms.lock();
 80        if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
 81            e.insert(Default::default());
 82            Ok(())
 83        } else {
 84            Err(anyhow!("room {:?} already exists", room))
 85        }
 86    }
 87
 88    async fn delete_room(&self, room: String) -> Result<()> {
 89        // TODO: clear state associated with all `Room`s.
 90        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
 91        #[cfg(any(test, feature = "test-support"))]
 92        self.executor.simulate_random_delay().await;
 93        let mut server_rooms = self.rooms.lock();
 94        server_rooms
 95            .remove(&room)
 96            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 97        Ok(())
 98    }
 99
100    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
101        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
102        #[cfg(any(test, feature = "test-support"))]
103        self.executor.simulate_random_delay().await;
104
105        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
106        let identity = claims.sub.unwrap().to_string();
107        let room_name = claims.video.room.unwrap();
108        let mut server_rooms = self.rooms.lock();
109        let room = (*server_rooms).entry(room_name.to_string()).or_default();
110
111        if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
112            for track in &room.video_tracks {
113                client_room
114                    .0
115                    .lock()
116                    .updates_tx
117                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
118                        RemoteVideoTrack {
119                            server_track: track.clone(),
120                        },
121                    )))
122                    .unwrap();
123            }
124            for track in &room.audio_tracks {
125                client_room
126                    .0
127                    .lock()
128                    .updates_tx
129                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
130                        Arc::new(RemoteAudioTrack {
131                            server_track: track.clone(),
132                            room: Arc::downgrade(&client_room),
133                        }),
134                        Arc::new(RemoteTrackPublication),
135                    ))
136                    .unwrap();
137            }
138            e.insert(client_room);
139            Ok(())
140        } else {
141            Err(anyhow!(
142                "{:?} attempted to join room {:?} twice",
143                identity,
144                room_name
145            ))
146        }
147    }
148
149    async fn leave_room(&self, token: String) -> Result<()> {
150        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
151        #[cfg(any(test, feature = "test-support"))]
152        self.executor.simulate_random_delay().await;
153        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
154        let identity = claims.sub.unwrap().to_string();
155        let room_name = claims.video.room.unwrap();
156        let mut server_rooms = self.rooms.lock();
157        let room = server_rooms
158            .get_mut(&*room_name)
159            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
160        room.client_rooms.remove(&identity).ok_or_else(|| {
161            anyhow!(
162                "{:?} attempted to leave room {:?} before joining it",
163                identity,
164                room_name
165            )
166        })?;
167        Ok(())
168    }
169
170    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
171        // TODO: clear state associated with the `Room`.
172        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
173        #[cfg(any(test, feature = "test-support"))]
174        self.executor.simulate_random_delay().await;
175
176        let mut server_rooms = self.rooms.lock();
177        let room = server_rooms
178            .get_mut(&room_name)
179            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
180        room.client_rooms.remove(&identity).ok_or_else(|| {
181            anyhow!(
182                "participant {:?} did not join room {:?}",
183                identity,
184                room_name
185            )
186        })?;
187        Ok(())
188    }
189
190    async fn update_participant(
191        &self,
192        room_name: String,
193        identity: String,
194        permission: proto::ParticipantPermission,
195    ) -> Result<()> {
196        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
197        #[cfg(any(test, feature = "test-support"))]
198        self.executor.simulate_random_delay().await;
199        let mut server_rooms = self.rooms.lock();
200        let room = server_rooms
201            .get_mut(&room_name)
202            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
203        room.participant_permissions.insert(identity, permission);
204        Ok(())
205    }
206
207    pub async fn disconnect_client(&self, client_identity: String) {
208        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
209        #[cfg(any(test, feature = "test-support"))]
210        self.executor.simulate_random_delay().await;
211        let mut server_rooms = self.rooms.lock();
212        for room in server_rooms.values_mut() {
213            if let Some(room) = room.client_rooms.remove(&client_identity) {
214                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
215            }
216        }
217    }
218
219    async fn publish_video_track(
220        &self,
221        token: String,
222        local_track: LocalVideoTrack,
223    ) -> Result<Sid> {
224        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
225        #[cfg(any(test, feature = "test-support"))]
226        self.executor.simulate_random_delay().await;
227        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
228        let identity = claims.sub.unwrap().to_string();
229        let room_name = claims.video.room.unwrap();
230
231        let mut server_rooms = self.rooms.lock();
232        let room = server_rooms
233            .get_mut(&*room_name)
234            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
235
236        let can_publish = room
237            .participant_permissions
238            .get(&identity)
239            .map(|permission| permission.can_publish)
240            .or(claims.video.can_publish)
241            .unwrap_or(true);
242
243        if !can_publish {
244            return Err(anyhow!("user is not allowed to publish"));
245        }
246
247        let sid = nanoid::nanoid!(17);
248        let track = Arc::new(TestServerVideoTrack {
249            sid: sid.clone(),
250            publisher_id: identity.clone(),
251            frames_rx: local_track.frames_rx.clone(),
252        });
253
254        room.video_tracks.push(track.clone());
255
256        for (id, client_room) in &room.client_rooms {
257            if *id != identity {
258                let _ = client_room
259                    .0
260                    .lock()
261                    .updates_tx
262                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
263                        RemoteVideoTrack {
264                            server_track: track.clone(),
265                        },
266                    )))
267                    .unwrap();
268            }
269        }
270
271        Ok(sid)
272    }
273
274    async fn publish_audio_track(
275        &self,
276        token: String,
277        _local_track: &LocalAudioTrack,
278    ) -> Result<Sid> {
279        // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
280        #[cfg(any(test, feature = "test-support"))]
281        self.executor.simulate_random_delay().await;
282
283        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
284        let identity = claims.sub.unwrap().to_string();
285        let room_name = claims.video.room.unwrap();
286
287        let mut server_rooms = self.rooms.lock();
288        let room = server_rooms
289            .get_mut(&*room_name)
290            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
291
292        let can_publish = room
293            .participant_permissions
294            .get(&identity)
295            .map(|permission| permission.can_publish)
296            .or(claims.video.can_publish)
297            .unwrap_or(true);
298
299        if !can_publish {
300            return Err(anyhow!("user is not allowed to publish"));
301        }
302
303        let sid = nanoid::nanoid!(17);
304        let track = Arc::new(TestServerAudioTrack {
305            sid: sid.clone(),
306            publisher_id: identity.clone(),
307            muted: AtomicBool::new(false),
308        });
309
310        let publication = Arc::new(RemoteTrackPublication);
311
312        room.audio_tracks.push(track.clone());
313
314        for (id, client_room) in &room.client_rooms {
315            if *id != identity {
316                let _ = client_room
317                    .0
318                    .lock()
319                    .updates_tx
320                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
321                        Arc::new(RemoteAudioTrack {
322                            server_track: track.clone(),
323                            room: Arc::downgrade(client_room),
324                        }),
325                        publication.clone(),
326                    ))
327                    .unwrap();
328            }
329        }
330
331        Ok(sid)
332    }
333
334    fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
335        let claims = livekit_api::token::validate(token, &self.secret_key)?;
336        let room_name = claims.video.room.unwrap();
337        let identity = claims.sub.unwrap();
338        let mut server_rooms = self.rooms.lock();
339        let room = server_rooms
340            .get_mut(&*room_name)
341            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
342        if let Some(track) = room
343            .audio_tracks
344            .iter_mut()
345            .find(|track| track.sid == track_sid)
346        {
347            track.muted.store(muted, SeqCst);
348            for (id, client_room) in room.client_rooms.iter() {
349                if *id != identity {
350                    client_room
351                        .0
352                        .lock()
353                        .updates_tx
354                        .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
355                            track_id: track_sid.to_string(),
356                            muted,
357                        })
358                        .unwrap();
359                }
360            }
361        }
362        Ok(())
363    }
364
365    fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
366        let claims = livekit_api::token::validate(token, &self.secret_key).ok()?;
367        let room_name = claims.video.room.unwrap();
368
369        let mut server_rooms = self.rooms.lock();
370        let room = server_rooms.get_mut(&*room_name)?;
371        room.audio_tracks.iter().find_map(|track| {
372            if track.sid == track_sid {
373                Some(track.muted.load(SeqCst))
374            } else {
375                None
376            }
377        })
378    }
379
380    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
381        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
382        let room_name = claims.video.room.unwrap();
383        let identity = claims.sub.unwrap();
384
385        let mut server_rooms = self.rooms.lock();
386        let room = server_rooms
387            .get_mut(&*room_name)
388            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
389        room.client_rooms
390            .get(identity.as_ref())
391            .ok_or_else(|| anyhow!("not a participant in room"))?;
392        Ok(room
393            .video_tracks
394            .iter()
395            .map(|track| {
396                Arc::new(RemoteVideoTrack {
397                    server_track: track.clone(),
398                })
399            })
400            .collect())
401    }
402
403    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
404        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
405        let room_name = claims.video.room.unwrap();
406        let identity = claims.sub.unwrap();
407
408        let mut server_rooms = self.rooms.lock();
409        let room = server_rooms
410            .get_mut(&*room_name)
411            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
412        let client_room = room
413            .client_rooms
414            .get(identity.as_ref())
415            .ok_or_else(|| anyhow!("not a participant in room"))?;
416        Ok(room
417            .audio_tracks
418            .iter()
419            .map(|track| {
420                Arc::new(RemoteAudioTrack {
421                    server_track: track.clone(),
422                    room: Arc::downgrade(client_room),
423                })
424            })
425            .collect())
426    }
427}
428
429#[derive(Default)]
430struct TestServerRoom {
431    client_rooms: HashMap<Sid, Arc<Room>>,
432    video_tracks: Vec<Arc<TestServerVideoTrack>>,
433    audio_tracks: Vec<Arc<TestServerAudioTrack>>,
434    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
435}
436
437#[derive(Debug)]
438struct TestServerVideoTrack {
439    sid: Sid,
440    publisher_id: Sid,
441    frames_rx: async_broadcast::Receiver<Frame>,
442}
443
444#[derive(Debug)]
445struct TestServerAudioTrack {
446    sid: Sid,
447    publisher_id: Sid,
448    muted: AtomicBool,
449}
450
451impl TestServerRoom {}
452
453pub struct TestApiClient {
454    url: String,
455}
456
457#[async_trait]
458impl livekit_api::Client for TestApiClient {
459    fn url(&self) -> &str {
460        &self.url
461    }
462
463    async fn create_room(&self, name: String) -> Result<()> {
464        let server = TestServer::get(&self.url)?;
465        server.create_room(name).await?;
466        Ok(())
467    }
468
469    async fn delete_room(&self, name: String) -> Result<()> {
470        let server = TestServer::get(&self.url)?;
471        server.delete_room(name).await?;
472        Ok(())
473    }
474
475    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
476        let server = TestServer::get(&self.url)?;
477        server.remove_participant(room, identity).await?;
478        Ok(())
479    }
480
481    async fn update_participant(
482        &self,
483        room: String,
484        identity: String,
485        permission: livekit_api::proto::ParticipantPermission,
486    ) -> Result<()> {
487        let server = TestServer::get(&self.url)?;
488        server
489            .update_participant(room, identity, permission)
490            .await?;
491        Ok(())
492    }
493
494    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
495        let server = TestServer::get(&self.url)?;
496        token::create(
497            &server.api_key,
498            &server.secret_key,
499            Some(identity),
500            token::VideoGrant::to_join(room),
501        )
502    }
503
504    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
505        let server = TestServer::get(&self.url)?;
506        token::create(
507            &server.api_key,
508            &server.secret_key,
509            Some(identity),
510            token::VideoGrant::for_guest(room),
511        )
512    }
513}
514
515struct RoomState {
516    connection: (
517        watch::Sender<ConnectionState>,
518        watch::Receiver<ConnectionState>,
519    ),
520    display_sources: Vec<MacOSDisplay>,
521    paused_audio_tracks: HashSet<Sid>,
522    updates_tx: async_broadcast::Sender<RoomUpdate>,
523    updates_rx: async_broadcast::Receiver<RoomUpdate>,
524}
525
526pub struct Room(Mutex<RoomState>);
527
528impl Room {
529    pub fn new() -> Arc<Self> {
530        let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
531        Arc::new(Self(Mutex::new(RoomState {
532            connection: watch::channel_with(ConnectionState::Disconnected),
533            display_sources: Default::default(),
534            paused_audio_tracks: Default::default(),
535            updates_tx,
536            updates_rx,
537        })))
538    }
539
540    pub fn status(&self) -> watch::Receiver<ConnectionState> {
541        self.0.lock().connection.1.clone()
542    }
543
544    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
545        let this = self.clone();
546        let url = url.to_string();
547        let token = token.to_string();
548        async move {
549            let server = TestServer::get(&url)?;
550            server
551                .join_room(token.clone(), this.clone())
552                .await
553                .context("room join")?;
554            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
555            Ok(())
556        }
557    }
558
559    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
560        let this = self.clone();
561        async move {
562            // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
563            #[cfg(any(test, feature = "test-support"))]
564            {
565                let server = this.test_server();
566                server.executor.simulate_random_delay().await;
567            }
568
569            Ok(this.0.lock().display_sources.clone())
570        }
571    }
572
573    pub fn publish_video_track(
574        self: &Arc<Self>,
575        track: LocalVideoTrack,
576    ) -> impl Future<Output = Result<LocalTrackPublication>> {
577        let this = self.clone();
578        let track = track.clone();
579        async move {
580            let sid = this
581                .test_server()
582                .publish_video_track(this.token(), track)
583                .await?;
584            Ok(LocalTrackPublication {
585                room: Arc::downgrade(&this),
586                sid,
587            })
588        }
589    }
590
591    pub fn publish_audio_track(
592        self: &Arc<Self>,
593        track: LocalAudioTrack,
594    ) -> impl Future<Output = Result<LocalTrackPublication>> {
595        let this = self.clone();
596        let track = track.clone();
597        async move {
598            let sid = this
599                .test_server()
600                .publish_audio_track(this.token(), &track)
601                .await?;
602            Ok(LocalTrackPublication {
603                room: Arc::downgrade(&this),
604                sid,
605            })
606        }
607    }
608
609    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
610
611    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
612        if !self.is_connected() {
613            return Vec::new();
614        }
615
616        self.test_server()
617            .audio_tracks(self.token())
618            .unwrap()
619            .into_iter()
620            .filter(|track| track.publisher_id() == publisher_id)
621            .collect()
622    }
623
624    pub fn remote_audio_track_publications(
625        &self,
626        publisher_id: &str,
627    ) -> Vec<Arc<RemoteTrackPublication>> {
628        if !self.is_connected() {
629            return Vec::new();
630        }
631
632        self.test_server()
633            .audio_tracks(self.token())
634            .unwrap()
635            .into_iter()
636            .filter(|track| track.publisher_id() == publisher_id)
637            .map(|_track| Arc::new(RemoteTrackPublication {}))
638            .collect()
639    }
640
641    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
642        if !self.is_connected() {
643            return Vec::new();
644        }
645
646        self.test_server()
647            .video_tracks(self.token())
648            .unwrap()
649            .into_iter()
650            .filter(|track| track.publisher_id() == publisher_id)
651            .collect()
652    }
653
654    pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
655        self.0.lock().updates_rx.clone()
656    }
657
658    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
659        self.0.lock().display_sources = sources;
660    }
661
662    fn test_server(&self) -> Arc<TestServer> {
663        match self.0.lock().connection.1.borrow().clone() {
664            ConnectionState::Disconnected => panic!("must be connected to call this method"),
665            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
666        }
667    }
668
669    fn token(&self) -> String {
670        match self.0.lock().connection.1.borrow().clone() {
671            ConnectionState::Disconnected => panic!("must be connected to call this method"),
672            ConnectionState::Connected { token, .. } => token,
673        }
674    }
675
676    fn is_connected(&self) -> bool {
677        match *self.0.lock().connection.1.borrow() {
678            ConnectionState::Disconnected => false,
679            ConnectionState::Connected { .. } => true,
680        }
681    }
682}
683
684impl Drop for Room {
685    fn drop(&mut self) {
686        if let ConnectionState::Connected { token, .. } = mem::replace(
687            &mut *self.0.lock().connection.0.borrow_mut(),
688            ConnectionState::Disconnected,
689        ) {
690            if let Ok(server) = TestServer::get(&token) {
691                let executor = server.executor.clone();
692                executor
693                    .spawn(async move { server.leave_room(token).await.unwrap() })
694                    .detach();
695            }
696        }
697    }
698}
699
700#[derive(Clone)]
701pub struct LocalTrackPublication {
702    sid: String,
703    room: Weak<Room>,
704}
705
706impl LocalTrackPublication {
707    pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
708        let sid = self.sid.clone();
709        let room = self.room.clone();
710        async move {
711            if let Some(room) = room.upgrade() {
712                room.test_server()
713                    .set_track_muted(&room.token(), &sid, mute)
714            } else {
715                Err(anyhow!("no such room"))
716            }
717        }
718    }
719
720    pub fn is_muted(&self) -> bool {
721        if let Some(room) = self.room.upgrade() {
722            room.test_server()
723                .is_track_muted(&room.token(), &self.sid)
724                .unwrap_or(false)
725        } else {
726            false
727        }
728    }
729
730    pub fn sid(&self) -> String {
731        self.sid.clone()
732    }
733}
734
735pub struct RemoteTrackPublication;
736
737impl RemoteTrackPublication {
738    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
739        async { Ok(()) }
740    }
741
742    pub fn is_muted(&self) -> bool {
743        false
744    }
745
746    pub fn sid(&self) -> String {
747        "".to_string()
748    }
749}
750
751#[derive(Clone)]
752pub struct LocalVideoTrack {
753    frames_rx: async_broadcast::Receiver<Frame>,
754}
755
756impl LocalVideoTrack {
757    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
758        Self {
759            frames_rx: display.frames.1.clone(),
760        }
761    }
762}
763
764#[derive(Clone)]
765pub struct LocalAudioTrack;
766
767impl LocalAudioTrack {
768    pub fn create() -> Self {
769        Self
770    }
771}
772
773#[derive(Debug)]
774pub struct RemoteVideoTrack {
775    server_track: Arc<TestServerVideoTrack>,
776}
777
778impl RemoteVideoTrack {
779    pub fn sid(&self) -> &str {
780        &self.server_track.sid
781    }
782
783    pub fn publisher_id(&self) -> &str {
784        &self.server_track.publisher_id
785    }
786
787    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
788        self.server_track.frames_rx.clone()
789    }
790}
791
792#[derive(Debug)]
793pub struct RemoteAudioTrack {
794    server_track: Arc<TestServerAudioTrack>,
795    room: Weak<Room>,
796}
797
798impl RemoteAudioTrack {
799    pub fn sid(&self) -> &str {
800        &self.server_track.sid
801    }
802
803    pub fn publisher_id(&self) -> &str {
804        &self.server_track.publisher_id
805    }
806
807    pub fn start(&self) {
808        if let Some(room) = self.room.upgrade() {
809            room.0
810                .lock()
811                .paused_audio_tracks
812                .remove(&self.server_track.sid);
813        }
814    }
815
816    pub fn stop(&self) {
817        if let Some(room) = self.room.upgrade() {
818            room.0
819                .lock()
820                .paused_audio_tracks
821                .insert(self.server_track.sid.clone());
822        }
823    }
824
825    pub fn is_playing(&self) -> bool {
826        !self
827            .room
828            .upgrade()
829            .unwrap()
830            .0
831            .lock()
832            .paused_audio_tracks
833            .contains(&self.server_track.sid)
834    }
835}
836
837#[derive(Clone)]
838pub struct MacOSDisplay {
839    frames: (
840        async_broadcast::Sender<Frame>,
841        async_broadcast::Receiver<Frame>,
842    ),
843}
844
845impl Default for MacOSDisplay {
846    fn default() -> Self {
847        Self::new()
848    }
849}
850
851impl MacOSDisplay {
852    pub fn new() -> Self {
853        Self {
854            frames: async_broadcast::broadcast(128),
855        }
856    }
857
858    pub fn send_frame(&self, frame: Frame) {
859        self.frames.0.try_broadcast(frame).unwrap();
860    }
861}
862
863#[derive(Clone, Debug, PartialEq, Eq)]
864pub struct Frame {
865    pub label: String,
866    pub width: usize,
867    pub height: usize,
868}
869
870impl Frame {
871    pub fn width(&self) -> usize {
872        self.width
873    }
874
875    pub fn height(&self) -> usize {
876        self.height
877    }
878
879    pub fn image(&self) -> SurfaceSource {
880        unimplemented!("you can't call this in test mode")
881    }
882}