test.rs

  1use crate::{ConnectionState, RoomUpdate, Sid};
  2use anyhow::{anyhow, Context, Result};
  3use async_trait::async_trait;
  4use collections::{BTreeMap, HashMap, HashSet};
  5use futures::Stream;
  6use gpui::{BackgroundExecutor, ImageSource};
  7use live_kit_server::{proto, token};
  8
  9use parking_lot::Mutex;
 10use postage::watch;
 11use std::{
 12    future::Future,
 13    mem,
 14    sync::{
 15        atomic::{AtomicBool, Ordering::SeqCst},
 16        Arc, Weak,
 17    },
 18};
 19
 20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 21
 22pub struct TestServer {
 23    pub url: String,
 24    pub api_key: String,
 25    pub secret_key: String,
 26    rooms: Mutex<HashMap<String, TestServerRoom>>,
 27    executor: BackgroundExecutor,
 28}
 29
 30impl TestServer {
 31    pub fn create(
 32        url: String,
 33        api_key: String,
 34        secret_key: String,
 35        executor: BackgroundExecutor,
 36    ) -> Result<Arc<TestServer>> {
 37        let mut servers = SERVERS.lock();
 38        if servers.contains_key(&url) {
 39            Err(anyhow!("a server with url {:?} already exists", url))
 40        } else {
 41            let server = Arc::new(TestServer {
 42                url: url.clone(),
 43                api_key,
 44                secret_key,
 45                rooms: Default::default(),
 46                executor,
 47            });
 48            servers.insert(url, server.clone());
 49            Ok(server)
 50        }
 51    }
 52
 53    fn get(url: &str) -> Result<Arc<TestServer>> {
 54        Ok(SERVERS
 55            .lock()
 56            .get(url)
 57            .ok_or_else(|| anyhow!("no server found for url: {}", url))?
 58            .clone())
 59    }
 60
 61    pub fn teardown(&self) -> Result<()> {
 62        SERVERS
 63            .lock()
 64            .remove(&self.url)
 65            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 66        Ok(())
 67    }
 68
 69    pub fn create_api_client(&self) -> TestApiClient {
 70        TestApiClient {
 71            url: self.url.clone(),
 72        }
 73    }
 74
 75    pub async fn create_room(&self, room: String) -> Result<()> {
 76        self.executor.simulate_random_delay().await;
 77        let mut server_rooms = self.rooms.lock();
 78        if server_rooms.contains_key(&room) {
 79            Err(anyhow!("room {:?} already exists", room))
 80        } else {
 81            server_rooms.insert(room, Default::default());
 82            Ok(())
 83        }
 84    }
 85
 86    async fn delete_room(&self, room: String) -> Result<()> {
 87        // TODO: clear state associated with all `Room`s.
 88        self.executor.simulate_random_delay().await;
 89        let mut server_rooms = self.rooms.lock();
 90        server_rooms
 91            .remove(&room)
 92            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 93        Ok(())
 94    }
 95
 96    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 97        self.executor.simulate_random_delay().await;
 98        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 99        let identity = claims.sub.unwrap().to_string();
100        let room_name = claims.video.room.unwrap();
101        let mut server_rooms = self.rooms.lock();
102        let room = (*server_rooms).entry(room_name.to_string()).or_default();
103
104        if room.client_rooms.contains_key(&identity) {
105            Err(anyhow!(
106                "{:?} attempted to join room {:?} twice",
107                identity,
108                room_name
109            ))
110        } else {
111            for track in &room.video_tracks {
112                client_room
113                    .0
114                    .lock()
115                    .updates_tx
116                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
117                        RemoteVideoTrack {
118                            server_track: track.clone(),
119                        },
120                    )))
121                    .unwrap();
122            }
123            for track in &room.audio_tracks {
124                client_room
125                    .0
126                    .lock()
127                    .updates_tx
128                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
129                        Arc::new(RemoteAudioTrack {
130                            server_track: track.clone(),
131                            room: Arc::downgrade(&client_room),
132                        }),
133                        Arc::new(RemoteTrackPublication),
134                    ))
135                    .unwrap();
136            }
137            room.client_rooms.insert(identity, client_room);
138            Ok(())
139        }
140    }
141
142    async fn leave_room(&self, token: String) -> Result<()> {
143        self.executor.simulate_random_delay().await;
144        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
145        let identity = claims.sub.unwrap().to_string();
146        let room_name = claims.video.room.unwrap();
147        let mut server_rooms = self.rooms.lock();
148        let room = server_rooms
149            .get_mut(&*room_name)
150            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
151        room.client_rooms.remove(&identity).ok_or_else(|| {
152            anyhow!(
153                "{:?} attempted to leave room {:?} before joining it",
154                identity,
155                room_name
156            )
157        })?;
158        Ok(())
159    }
160
161    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
162        // TODO: clear state associated with the `Room`.
163        self.executor.simulate_random_delay().await;
164        let mut server_rooms = self.rooms.lock();
165        let room = server_rooms
166            .get_mut(&room_name)
167            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
168        room.client_rooms.remove(&identity).ok_or_else(|| {
169            anyhow!(
170                "participant {:?} did not join room {:?}",
171                identity,
172                room_name
173            )
174        })?;
175        Ok(())
176    }
177
178    async fn update_participant(
179        &self,
180        room_name: String,
181        identity: String,
182        permission: proto::ParticipantPermission,
183    ) -> Result<()> {
184        self.executor.simulate_random_delay().await;
185        let mut server_rooms = self.rooms.lock();
186        let room = server_rooms
187            .get_mut(&room_name)
188            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
189        room.participant_permissions.insert(identity, permission);
190        Ok(())
191    }
192
193    pub async fn disconnect_client(&self, client_identity: String) {
194        self.executor.simulate_random_delay().await;
195        let mut server_rooms = self.rooms.lock();
196        for room in server_rooms.values_mut() {
197            if let Some(room) = room.client_rooms.remove(&client_identity) {
198                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
199            }
200        }
201    }
202
203    async fn publish_video_track(
204        &self,
205        token: String,
206        local_track: LocalVideoTrack,
207    ) -> Result<Sid> {
208        self.executor.simulate_random_delay().await;
209        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
210        let identity = claims.sub.unwrap().to_string();
211        let room_name = claims.video.room.unwrap();
212
213        let mut server_rooms = self.rooms.lock();
214        let room = server_rooms
215            .get_mut(&*room_name)
216            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
217
218        let can_publish = room
219            .participant_permissions
220            .get(&identity)
221            .map(|permission| permission.can_publish)
222            .or(claims.video.can_publish)
223            .unwrap_or(true);
224
225        if !can_publish {
226            return Err(anyhow!("user is not allowed to publish"));
227        }
228
229        let sid = nanoid::nanoid!(17);
230        let track = Arc::new(TestServerVideoTrack {
231            sid: sid.clone(),
232            publisher_id: identity.clone(),
233            frames_rx: local_track.frames_rx.clone(),
234        });
235
236        room.video_tracks.push(track.clone());
237
238        for (id, client_room) in &room.client_rooms {
239            if *id != identity {
240                let _ = client_room
241                    .0
242                    .lock()
243                    .updates_tx
244                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
245                        RemoteVideoTrack {
246                            server_track: track.clone(),
247                        },
248                    )))
249                    .unwrap();
250            }
251        }
252
253        Ok(sid)
254    }
255
256    async fn publish_audio_track(
257        &self,
258        token: String,
259        _local_track: &LocalAudioTrack,
260    ) -> Result<Sid> {
261        self.executor.simulate_random_delay().await;
262        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
263        let identity = claims.sub.unwrap().to_string();
264        let room_name = claims.video.room.unwrap();
265
266        let mut server_rooms = self.rooms.lock();
267        let room = server_rooms
268            .get_mut(&*room_name)
269            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
270
271        let can_publish = room
272            .participant_permissions
273            .get(&identity)
274            .map(|permission| permission.can_publish)
275            .or(claims.video.can_publish)
276            .unwrap_or(true);
277
278        if !can_publish {
279            return Err(anyhow!("user is not allowed to publish"));
280        }
281
282        let sid = nanoid::nanoid!(17);
283        let track = Arc::new(TestServerAudioTrack {
284            sid: sid.clone(),
285            publisher_id: identity.clone(),
286            muted: AtomicBool::new(false),
287        });
288
289        let publication = Arc::new(RemoteTrackPublication);
290
291        room.audio_tracks.push(track.clone());
292
293        for (id, client_room) in &room.client_rooms {
294            if *id != identity {
295                let _ = client_room
296                    .0
297                    .lock()
298                    .updates_tx
299                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
300                        Arc::new(RemoteAudioTrack {
301                            server_track: track.clone(),
302                            room: Arc::downgrade(&client_room),
303                        }),
304                        publication.clone(),
305                    ))
306                    .unwrap();
307            }
308        }
309
310        Ok(sid)
311    }
312
313    fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
314        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
315        let room_name = claims.video.room.unwrap();
316        let identity = claims.sub.unwrap();
317        let mut server_rooms = self.rooms.lock();
318        let room = server_rooms
319            .get_mut(&*room_name)
320            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
321        if let Some(track) = room
322            .audio_tracks
323            .iter_mut()
324            .find(|track| track.sid == track_sid)
325        {
326            track.muted.store(muted, SeqCst);
327            for (id, client_room) in room.client_rooms.iter() {
328                if *id != identity {
329                    client_room
330                        .0
331                        .lock()
332                        .updates_tx
333                        .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
334                            track_id: track_sid.to_string(),
335                            muted,
336                        })
337                        .unwrap();
338                }
339            }
340        }
341        Ok(())
342    }
343
344    fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
345        let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?;
346        let room_name = claims.video.room.unwrap();
347
348        let mut server_rooms = self.rooms.lock();
349        let room = server_rooms.get_mut(&*room_name)?;
350        room.audio_tracks.iter().find_map(|track| {
351            if track.sid == track_sid {
352                Some(track.muted.load(SeqCst))
353            } else {
354                None
355            }
356        })
357    }
358
359    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
360        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
361        let room_name = claims.video.room.unwrap();
362        let identity = claims.sub.unwrap();
363
364        let mut server_rooms = self.rooms.lock();
365        let room = server_rooms
366            .get_mut(&*room_name)
367            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
368        room.client_rooms
369            .get(identity.as_ref())
370            .ok_or_else(|| anyhow!("not a participant in room"))?;
371        Ok(room
372            .video_tracks
373            .iter()
374            .map(|track| {
375                Arc::new(RemoteVideoTrack {
376                    server_track: track.clone(),
377                })
378            })
379            .collect())
380    }
381
382    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
383        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
384        let room_name = claims.video.room.unwrap();
385        let identity = claims.sub.unwrap();
386
387        let mut server_rooms = self.rooms.lock();
388        let room = server_rooms
389            .get_mut(&*room_name)
390            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
391        let client_room = room
392            .client_rooms
393            .get(identity.as_ref())
394            .ok_or_else(|| anyhow!("not a participant in room"))?;
395        Ok(room
396            .audio_tracks
397            .iter()
398            .map(|track| {
399                Arc::new(RemoteAudioTrack {
400                    server_track: track.clone(),
401                    room: Arc::downgrade(&client_room),
402                })
403            })
404            .collect())
405    }
406}
407
408#[derive(Default)]
409struct TestServerRoom {
410    client_rooms: HashMap<Sid, Arc<Room>>,
411    video_tracks: Vec<Arc<TestServerVideoTrack>>,
412    audio_tracks: Vec<Arc<TestServerAudioTrack>>,
413    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
414}
415
416impl Drop for TestServerRoom {
417    fn drop(&mut self) {
418        for room in self.client_rooms.values() {
419            let mut state = room.0.lock();
420            *state.connection.0.borrow_mut() = ConnectionState::Disconnected;
421        }
422    }
423}
424
425#[derive(Debug)]
426struct TestServerVideoTrack {
427    sid: Sid,
428    publisher_id: Sid,
429    frames_rx: async_broadcast::Receiver<Frame>,
430}
431
432#[derive(Debug)]
433struct TestServerAudioTrack {
434    sid: Sid,
435    publisher_id: Sid,
436    muted: AtomicBool,
437}
438
439impl TestServerRoom {}
440
441pub struct TestApiClient {
442    url: String,
443}
444
445#[async_trait]
446impl live_kit_server::api::Client for TestApiClient {
447    fn url(&self) -> &str {
448        &self.url
449    }
450
451    async fn create_room(&self, name: String) -> Result<()> {
452        let server = TestServer::get(&self.url)?;
453        server.create_room(name).await?;
454        Ok(())
455    }
456
457    async fn delete_room(&self, name: String) -> Result<()> {
458        let server = TestServer::get(&self.url)?;
459        server.delete_room(name).await?;
460        Ok(())
461    }
462
463    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
464        let server = TestServer::get(&self.url)?;
465        server.remove_participant(room, identity).await?;
466        Ok(())
467    }
468
469    async fn update_participant(
470        &self,
471        room: String,
472        identity: String,
473        permission: live_kit_server::proto::ParticipantPermission,
474    ) -> Result<()> {
475        let server = TestServer::get(&self.url)?;
476        server
477            .update_participant(room, identity, permission)
478            .await?;
479        Ok(())
480    }
481
482    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
483        let server = TestServer::get(&self.url)?;
484        token::create(
485            &server.api_key,
486            &server.secret_key,
487            Some(identity),
488            token::VideoGrant::to_join(room),
489        )
490    }
491
492    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
493        let server = TestServer::get(&self.url)?;
494        token::create(
495            &server.api_key,
496            &server.secret_key,
497            Some(identity),
498            token::VideoGrant::for_guest(room),
499        )
500    }
501}
502
503struct RoomState {
504    connection: (
505        watch::Sender<ConnectionState>,
506        watch::Receiver<ConnectionState>,
507    ),
508    display_sources: Vec<MacOSDisplay>,
509    paused_audio_tracks: HashSet<Sid>,
510    updates_tx: async_broadcast::Sender<RoomUpdate>,
511    updates_rx: async_broadcast::Receiver<RoomUpdate>,
512}
513
514pub struct Room(Mutex<RoomState>);
515
516impl Room {
517    pub fn new() -> Arc<Self> {
518        let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
519        Arc::new(Self(Mutex::new(RoomState {
520            connection: watch::channel_with(ConnectionState::Disconnected),
521            display_sources: Default::default(),
522            paused_audio_tracks: Default::default(),
523            updates_tx,
524            updates_rx,
525        })))
526    }
527
528    pub fn status(&self) -> watch::Receiver<ConnectionState> {
529        self.0.lock().connection.1.clone()
530    }
531
532    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
533        let this = self.clone();
534        let url = url.to_string();
535        let token = token.to_string();
536        async move {
537            let server = TestServer::get(&url)?;
538            server
539                .join_room(token.clone(), this.clone())
540                .await
541                .context("room join")?;
542            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
543            Ok(())
544        }
545    }
546
547    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
548        let this = self.clone();
549        async move {
550            let server = this.test_server();
551            server.executor.simulate_random_delay().await;
552            Ok(this.0.lock().display_sources.clone())
553        }
554    }
555
556    pub fn publish_video_track(
557        self: &Arc<Self>,
558        track: LocalVideoTrack,
559    ) -> impl Future<Output = Result<LocalTrackPublication>> {
560        let this = self.clone();
561        let track = track.clone();
562        async move {
563            let sid = this
564                .test_server()
565                .publish_video_track(this.token(), track)
566                .await?;
567            Ok(LocalTrackPublication {
568                room: Arc::downgrade(&this),
569                sid,
570            })
571        }
572    }
573
574    pub fn publish_audio_track(
575        self: &Arc<Self>,
576        track: LocalAudioTrack,
577    ) -> impl Future<Output = Result<LocalTrackPublication>> {
578        let this = self.clone();
579        let track = track.clone();
580        async move {
581            let sid = this
582                .test_server()
583                .publish_audio_track(this.token(), &track)
584                .await?;
585            Ok(LocalTrackPublication {
586                room: Arc::downgrade(&this),
587                sid,
588            })
589        }
590    }
591
592    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
593
594    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
595        if !self.is_connected() {
596            return Vec::new();
597        }
598
599        self.test_server()
600            .audio_tracks(self.token())
601            .unwrap()
602            .into_iter()
603            .filter(|track| track.publisher_id() == publisher_id)
604            .collect()
605    }
606
607    pub fn remote_audio_track_publications(
608        &self,
609        publisher_id: &str,
610    ) -> Vec<Arc<RemoteTrackPublication>> {
611        if !self.is_connected() {
612            return Vec::new();
613        }
614
615        self.test_server()
616            .audio_tracks(self.token())
617            .unwrap()
618            .into_iter()
619            .filter(|track| track.publisher_id() == publisher_id)
620            .map(|_track| Arc::new(RemoteTrackPublication {}))
621            .collect()
622    }
623
624    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
625        if !self.is_connected() {
626            return Vec::new();
627        }
628
629        self.test_server()
630            .video_tracks(self.token())
631            .unwrap()
632            .into_iter()
633            .filter(|track| track.publisher_id() == publisher_id)
634            .collect()
635    }
636
637    pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
638        self.0.lock().updates_rx.clone()
639    }
640
641    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
642        self.0.lock().display_sources = sources;
643    }
644
645    fn test_server(&self) -> Arc<TestServer> {
646        match self.0.lock().connection.1.borrow().clone() {
647            ConnectionState::Disconnected => panic!("must be connected to call this method"),
648            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
649        }
650    }
651
652    fn token(&self) -> String {
653        match self.0.lock().connection.1.borrow().clone() {
654            ConnectionState::Disconnected => panic!("must be connected to call this method"),
655            ConnectionState::Connected { token, .. } => token,
656        }
657    }
658
659    fn is_connected(&self) -> bool {
660        match *self.0.lock().connection.1.borrow() {
661            ConnectionState::Disconnected => false,
662            ConnectionState::Connected { .. } => true,
663        }
664    }
665}
666
667impl Drop for Room {
668    fn drop(&mut self) {
669        if let ConnectionState::Connected { token, .. } = mem::replace(
670            &mut *self.0.lock().connection.0.borrow_mut(),
671            ConnectionState::Disconnected,
672        ) {
673            if let Ok(server) = TestServer::get(&token) {
674                let executor = server.executor.clone();
675                executor
676                    .spawn(async move { server.leave_room(token).await.unwrap() })
677                    .detach();
678            }
679        }
680    }
681}
682
683#[derive(Clone)]
684pub struct LocalTrackPublication {
685    sid: String,
686    room: Weak<Room>,
687}
688
689impl LocalTrackPublication {
690    pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
691        let sid = self.sid.clone();
692        let room = self.room.clone();
693        async move {
694            if let Some(room) = room.upgrade() {
695                room.test_server()
696                    .set_track_muted(&room.token(), &sid, mute)
697            } else {
698                Err(anyhow!("no such room"))
699            }
700        }
701    }
702
703    pub fn is_muted(&self) -> bool {
704        if let Some(room) = self.room.upgrade() {
705            if room.is_connected() {
706                room.test_server()
707                    .is_track_muted(&room.token(), &self.sid)
708                    .unwrap_or(true)
709            } else {
710                true
711            }
712        } else {
713            true
714        }
715    }
716
717    pub fn sid(&self) -> String {
718        self.sid.clone()
719    }
720}
721
722pub struct RemoteTrackPublication;
723
724impl RemoteTrackPublication {
725    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
726        async { Ok(()) }
727    }
728
729    pub fn is_muted(&self) -> bool {
730        false
731    }
732
733    pub fn sid(&self) -> String {
734        "".to_string()
735    }
736}
737
738#[derive(Clone)]
739pub struct LocalVideoTrack {
740    frames_rx: async_broadcast::Receiver<Frame>,
741}
742
743impl LocalVideoTrack {
744    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
745        Self {
746            frames_rx: display.frames.1.clone(),
747        }
748    }
749}
750
751#[derive(Clone)]
752pub struct LocalAudioTrack;
753
754impl LocalAudioTrack {
755    pub fn create() -> Self {
756        Self
757    }
758}
759
760#[derive(Debug)]
761pub struct RemoteVideoTrack {
762    server_track: Arc<TestServerVideoTrack>,
763}
764
765impl RemoteVideoTrack {
766    pub fn sid(&self) -> &str {
767        &self.server_track.sid
768    }
769
770    pub fn publisher_id(&self) -> &str {
771        &self.server_track.publisher_id
772    }
773
774    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
775        self.server_track.frames_rx.clone()
776    }
777}
778
779#[derive(Debug)]
780pub struct RemoteAudioTrack {
781    server_track: Arc<TestServerAudioTrack>,
782    room: Weak<Room>,
783}
784
785impl RemoteAudioTrack {
786    pub fn sid(&self) -> &str {
787        &self.server_track.sid
788    }
789
790    pub fn publisher_id(&self) -> &str {
791        &self.server_track.publisher_id
792    }
793
794    pub fn start(&self) {
795        if let Some(room) = self.room.upgrade() {
796            room.0
797                .lock()
798                .paused_audio_tracks
799                .remove(&self.server_track.sid);
800        }
801    }
802
803    pub fn stop(&self) {
804        if let Some(room) = self.room.upgrade() {
805            room.0
806                .lock()
807                .paused_audio_tracks
808                .insert(self.server_track.sid.clone());
809        }
810    }
811
812    pub fn is_playing(&self) -> bool {
813        !self
814            .room
815            .upgrade()
816            .unwrap()
817            .0
818            .lock()
819            .paused_audio_tracks
820            .contains(&self.server_track.sid)
821    }
822}
823
824#[derive(Clone)]
825pub struct MacOSDisplay {
826    frames: (
827        async_broadcast::Sender<Frame>,
828        async_broadcast::Receiver<Frame>,
829    ),
830}
831
832impl MacOSDisplay {
833    pub fn new() -> Self {
834        Self {
835            frames: async_broadcast::broadcast(128),
836        }
837    }
838
839    pub fn send_frame(&self, frame: Frame) {
840        self.frames.0.try_broadcast(frame).unwrap();
841    }
842}
843
844#[derive(Clone, Debug, PartialEq, Eq)]
845pub struct Frame {
846    pub label: String,
847    pub width: usize,
848    pub height: usize,
849}
850
851impl Frame {
852    pub fn width(&self) -> usize {
853        self.width
854    }
855
856    pub fn height(&self) -> usize {
857        self.height
858    }
859
860    pub fn image(&self) -> ImageSource {
861        unimplemented!("you can't call this in test mode")
862    }
863}