test.rs

  1use crate::{ConnectionState, RoomUpdate, Sid};
  2use anyhow::{anyhow, Context, Result};
  3use async_trait::async_trait;
  4use collections::{BTreeMap, HashMap, HashSet};
  5use futures::Stream;
  6use gpui::BackgroundExecutor;
  7use live_kit_server::{proto, token};
  8#[cfg(target_os = "macos")]
  9use media::core_video::CVImageBuffer;
 10use parking_lot::Mutex;
 11use postage::watch;
 12use std::{
 13    future::Future,
 14    mem,
 15    sync::{
 16        atomic::{AtomicBool, Ordering::SeqCst},
 17        Arc, Weak,
 18    },
 19};
 20
 21static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 22
 23pub struct TestServer {
 24    pub url: String,
 25    pub api_key: String,
 26    pub secret_key: String,
 27    rooms: Mutex<HashMap<String, TestServerRoom>>,
 28    executor: BackgroundExecutor,
 29}
 30
 31impl TestServer {
 32    pub fn create(
 33        url: String,
 34        api_key: String,
 35        secret_key: String,
 36        executor: BackgroundExecutor,
 37    ) -> Result<Arc<TestServer>> {
 38        let mut servers = SERVERS.lock();
 39        if servers.contains_key(&url) {
 40            Err(anyhow!("a server with url {:?} already exists", url))
 41        } else {
 42            let server = Arc::new(TestServer {
 43                url: url.clone(),
 44                api_key,
 45                secret_key,
 46                rooms: Default::default(),
 47                executor,
 48            });
 49            servers.insert(url, server.clone());
 50            Ok(server)
 51        }
 52    }
 53
 54    fn get(url: &str) -> Result<Arc<TestServer>> {
 55        Ok(SERVERS
 56            .lock()
 57            .get(url)
 58            .ok_or_else(|| anyhow!("no server found for url"))?
 59            .clone())
 60    }
 61
 62    pub fn teardown(&self) -> Result<()> {
 63        SERVERS
 64            .lock()
 65            .remove(&self.url)
 66            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 67        Ok(())
 68    }
 69
 70    pub fn create_api_client(&self) -> TestApiClient {
 71        TestApiClient {
 72            url: self.url.clone(),
 73        }
 74    }
 75
 76    pub async fn create_room(&self, room: String) -> Result<()> {
 77        self.executor.simulate_random_delay().await;
 78        let mut server_rooms = self.rooms.lock();
 79        if server_rooms.contains_key(&room) {
 80            Err(anyhow!("room {:?} already exists", room))
 81        } else {
 82            server_rooms.insert(room, Default::default());
 83            Ok(())
 84        }
 85    }
 86
 87    async fn delete_room(&self, room: String) -> Result<()> {
 88        // TODO: clear state associated with all `Room`s.
 89        self.executor.simulate_random_delay().await;
 90        let mut server_rooms = self.rooms.lock();
 91        server_rooms
 92            .remove(&room)
 93            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 94        Ok(())
 95    }
 96
 97    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 98        self.executor.simulate_random_delay().await;
 99        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
100        let identity = claims.sub.unwrap().to_string();
101        let room_name = claims.video.room.unwrap();
102        let mut server_rooms = self.rooms.lock();
103        let room = (*server_rooms).entry(room_name.to_string()).or_default();
104
105        if room.client_rooms.contains_key(&identity) {
106            Err(anyhow!(
107                "{:?} attempted to join room {:?} twice",
108                identity,
109                room_name
110            ))
111        } else {
112            for track in &room.video_tracks {
113                client_room
114                    .0
115                    .lock()
116                    .updates_tx
117                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
118                        RemoteVideoTrack {
119                            server_track: track.clone(),
120                        },
121                    )))
122                    .unwrap();
123            }
124            for track in &room.audio_tracks {
125                client_room
126                    .0
127                    .lock()
128                    .updates_tx
129                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
130                        Arc::new(RemoteAudioTrack {
131                            server_track: track.clone(),
132                            room: Arc::downgrade(&client_room),
133                        }),
134                        Arc::new(RemoteTrackPublication),
135                    ))
136                    .unwrap();
137            }
138            room.client_rooms.insert(identity, client_room);
139            Ok(())
140        }
141    }
142
143    async fn leave_room(&self, token: String) -> Result<()> {
144        self.executor.simulate_random_delay().await;
145        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
146        let identity = claims.sub.unwrap().to_string();
147        let room_name = claims.video.room.unwrap();
148        let mut server_rooms = self.rooms.lock();
149        let room = server_rooms
150            .get_mut(&*room_name)
151            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
152        room.client_rooms.remove(&identity).ok_or_else(|| {
153            anyhow!(
154                "{:?} attempted to leave room {:?} before joining it",
155                identity,
156                room_name
157            )
158        })?;
159        Ok(())
160    }
161
162    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
163        // TODO: clear state associated with the `Room`.
164
165        self.executor.simulate_random_delay().await;
166        let mut server_rooms = self.rooms.lock();
167        let room = server_rooms
168            .get_mut(&room_name)
169            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
170        room.client_rooms.remove(&identity).ok_or_else(|| {
171            anyhow!(
172                "participant {:?} did not join room {:?}",
173                identity,
174                room_name
175            )
176        })?;
177        Ok(())
178    }
179
180    async fn update_participant(
181        &self,
182        room_name: String,
183        identity: String,
184        permission: proto::ParticipantPermission,
185    ) -> Result<()> {
186        self.executor.simulate_random_delay().await;
187        let mut server_rooms = self.rooms.lock();
188        let room = server_rooms
189            .get_mut(&room_name)
190            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
191        room.participant_permissions.insert(identity, permission);
192        Ok(())
193    }
194
195    pub async fn disconnect_client(&self, client_identity: String) {
196        self.executor.simulate_random_delay().await;
197        let mut server_rooms = self.rooms.lock();
198        for room in server_rooms.values_mut() {
199            if let Some(room) = room.client_rooms.remove(&client_identity) {
200                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
201            }
202        }
203    }
204
205    async fn publish_video_track(
206        &self,
207        token: String,
208        local_track: LocalVideoTrack,
209    ) -> Result<Sid> {
210        self.executor.simulate_random_delay().await;
211        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
212        let identity = claims.sub.unwrap().to_string();
213        let room_name = claims.video.room.unwrap();
214
215        let mut server_rooms = self.rooms.lock();
216        let room = server_rooms
217            .get_mut(&*room_name)
218            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
219
220        let can_publish = room
221            .participant_permissions
222            .get(&identity)
223            .map(|permission| permission.can_publish)
224            .or(claims.video.can_publish)
225            .unwrap_or(true);
226
227        if !can_publish {
228            return Err(anyhow!("user is not allowed to publish"));
229        }
230
231        let sid = nanoid::nanoid!(17);
232        let track = Arc::new(TestServerVideoTrack {
233            sid: sid.clone(),
234            publisher_id: identity.clone(),
235            frames_rx: local_track.frames_rx.clone(),
236        });
237
238        room.video_tracks.push(track.clone());
239
240        for (id, client_room) in &room.client_rooms {
241            if *id != identity {
242                let _ = client_room
243                    .0
244                    .lock()
245                    .updates_tx
246                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
247                        RemoteVideoTrack {
248                            server_track: track.clone(),
249                        },
250                    )))
251                    .unwrap();
252            }
253        }
254
255        Ok(sid)
256    }
257
258    async fn publish_audio_track(
259        &self,
260        token: String,
261        _local_track: &LocalAudioTrack,
262    ) -> Result<Sid> {
263        self.executor.simulate_random_delay().await;
264        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
265        let identity = claims.sub.unwrap().to_string();
266        let room_name = claims.video.room.unwrap();
267
268        let mut server_rooms = self.rooms.lock();
269        let room = server_rooms
270            .get_mut(&*room_name)
271            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
272
273        let can_publish = room
274            .participant_permissions
275            .get(&identity)
276            .map(|permission| permission.can_publish)
277            .or(claims.video.can_publish)
278            .unwrap_or(true);
279
280        if !can_publish {
281            return Err(anyhow!("user is not allowed to publish"));
282        }
283
284        let sid = nanoid::nanoid!(17);
285        let track = Arc::new(TestServerAudioTrack {
286            sid: sid.clone(),
287            publisher_id: identity.clone(),
288            muted: AtomicBool::new(false),
289        });
290
291        let publication = Arc::new(RemoteTrackPublication);
292
293        room.audio_tracks.push(track.clone());
294
295        for (id, client_room) in &room.client_rooms {
296            if *id != identity {
297                let _ = client_room
298                    .0
299                    .lock()
300                    .updates_tx
301                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
302                        Arc::new(RemoteAudioTrack {
303                            server_track: track.clone(),
304                            room: Arc::downgrade(&client_room),
305                        }),
306                        publication.clone(),
307                    ))
308                    .unwrap();
309            }
310        }
311
312        Ok(sid)
313    }
314
315    fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
316        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
317        let room_name = claims.video.room.unwrap();
318        let identity = claims.sub.unwrap();
319        let mut server_rooms = self.rooms.lock();
320        let room = server_rooms
321            .get_mut(&*room_name)
322            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
323        if let Some(track) = room
324            .audio_tracks
325            .iter_mut()
326            .find(|track| track.sid == track_sid)
327        {
328            track.muted.store(muted, SeqCst);
329            for (id, client_room) in room.client_rooms.iter() {
330                if *id != identity {
331                    client_room
332                        .0
333                        .lock()
334                        .updates_tx
335                        .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
336                            track_id: track_sid.to_string(),
337                            muted,
338                        })
339                        .unwrap();
340                }
341            }
342        }
343        Ok(())
344    }
345
346    fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
347        let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?;
348        let room_name = claims.video.room.unwrap();
349
350        let mut server_rooms = self.rooms.lock();
351        let room = server_rooms.get_mut(&*room_name)?;
352        room.audio_tracks.iter().find_map(|track| {
353            if track.sid == track_sid {
354                Some(track.muted.load(SeqCst))
355            } else {
356                None
357            }
358        })
359    }
360
361    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
362        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
363        let room_name = claims.video.room.unwrap();
364        let identity = claims.sub.unwrap();
365
366        let mut server_rooms = self.rooms.lock();
367        let room = server_rooms
368            .get_mut(&*room_name)
369            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
370        room.client_rooms
371            .get(identity.as_ref())
372            .ok_or_else(|| anyhow!("not a participant in room"))?;
373        Ok(room
374            .video_tracks
375            .iter()
376            .map(|track| {
377                Arc::new(RemoteVideoTrack {
378                    server_track: track.clone(),
379                })
380            })
381            .collect())
382    }
383
384    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
385        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
386        let room_name = claims.video.room.unwrap();
387        let identity = claims.sub.unwrap();
388
389        let mut server_rooms = self.rooms.lock();
390        let room = server_rooms
391            .get_mut(&*room_name)
392            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
393        let client_room = room
394            .client_rooms
395            .get(identity.as_ref())
396            .ok_or_else(|| anyhow!("not a participant in room"))?;
397        Ok(room
398            .audio_tracks
399            .iter()
400            .map(|track| {
401                Arc::new(RemoteAudioTrack {
402                    server_track: track.clone(),
403                    room: Arc::downgrade(&client_room),
404                })
405            })
406            .collect())
407    }
408}
409
410#[derive(Default)]
411struct TestServerRoom {
412    client_rooms: HashMap<Sid, Arc<Room>>,
413    video_tracks: Vec<Arc<TestServerVideoTrack>>,
414    audio_tracks: Vec<Arc<TestServerAudioTrack>>,
415    participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
416}
417
418#[derive(Debug)]
419struct TestServerVideoTrack {
420    sid: Sid,
421    publisher_id: Sid,
422    frames_rx: async_broadcast::Receiver<Frame>,
423}
424
425#[derive(Debug)]
426struct TestServerAudioTrack {
427    sid: Sid,
428    publisher_id: Sid,
429    muted: AtomicBool,
430}
431
432impl TestServerRoom {}
433
434pub struct TestApiClient {
435    url: String,
436}
437
438#[async_trait]
439impl live_kit_server::api::Client for TestApiClient {
440    fn url(&self) -> &str {
441        &self.url
442    }
443
444    async fn create_room(&self, name: String) -> Result<()> {
445        let server = TestServer::get(&self.url)?;
446        server.create_room(name).await?;
447        Ok(())
448    }
449
450    async fn delete_room(&self, name: String) -> Result<()> {
451        let server = TestServer::get(&self.url)?;
452        server.delete_room(name).await?;
453        Ok(())
454    }
455
456    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
457        let server = TestServer::get(&self.url)?;
458        server.remove_participant(room, identity).await?;
459        Ok(())
460    }
461
462    async fn update_participant(
463        &self,
464        room: String,
465        identity: String,
466        permission: live_kit_server::proto::ParticipantPermission,
467    ) -> Result<()> {
468        let server = TestServer::get(&self.url)?;
469        server
470            .update_participant(room, identity, permission)
471            .await?;
472        Ok(())
473    }
474
475    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
476        let server = TestServer::get(&self.url)?;
477        token::create(
478            &server.api_key,
479            &server.secret_key,
480            Some(identity),
481            token::VideoGrant::to_join(room),
482        )
483    }
484
485    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
486        let server = TestServer::get(&self.url)?;
487        token::create(
488            &server.api_key,
489            &server.secret_key,
490            Some(identity),
491            token::VideoGrant::for_guest(room),
492        )
493    }
494}
495
496struct RoomState {
497    connection: (
498        watch::Sender<ConnectionState>,
499        watch::Receiver<ConnectionState>,
500    ),
501    display_sources: Vec<MacOSDisplay>,
502    paused_audio_tracks: HashSet<Sid>,
503    updates_tx: async_broadcast::Sender<RoomUpdate>,
504    updates_rx: async_broadcast::Receiver<RoomUpdate>,
505}
506
507pub struct Room(Mutex<RoomState>);
508
509impl Room {
510    pub fn new() -> Arc<Self> {
511        let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
512        Arc::new(Self(Mutex::new(RoomState {
513            connection: watch::channel_with(ConnectionState::Disconnected),
514            display_sources: Default::default(),
515            paused_audio_tracks: Default::default(),
516            updates_tx,
517            updates_rx,
518        })))
519    }
520
521    pub fn status(&self) -> watch::Receiver<ConnectionState> {
522        self.0.lock().connection.1.clone()
523    }
524
525    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
526        let this = self.clone();
527        let url = url.to_string();
528        let token = token.to_string();
529        async move {
530            let server = TestServer::get(&url)?;
531            server
532                .join_room(token.clone(), this.clone())
533                .await
534                .context("room join")?;
535            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
536            Ok(())
537        }
538    }
539
540    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
541        let this = self.clone();
542        async move {
543            let server = this.test_server();
544            server.executor.simulate_random_delay().await;
545            Ok(this.0.lock().display_sources.clone())
546        }
547    }
548
549    pub fn publish_video_track(
550        self: &Arc<Self>,
551        track: LocalVideoTrack,
552    ) -> impl Future<Output = Result<LocalTrackPublication>> {
553        let this = self.clone();
554        let track = track.clone();
555        async move {
556            let sid = this
557                .test_server()
558                .publish_video_track(this.token(), track)
559                .await?;
560            Ok(LocalTrackPublication {
561                room: Arc::downgrade(&this),
562                sid,
563            })
564        }
565    }
566
567    pub fn publish_audio_track(
568        self: &Arc<Self>,
569        track: LocalAudioTrack,
570    ) -> impl Future<Output = Result<LocalTrackPublication>> {
571        let this = self.clone();
572        let track = track.clone();
573        async move {
574            let sid = this
575                .test_server()
576                .publish_audio_track(this.token(), &track)
577                .await?;
578            Ok(LocalTrackPublication {
579                room: Arc::downgrade(&this),
580                sid,
581            })
582        }
583    }
584
585    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
586
587    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
588        if !self.is_connected() {
589            return Vec::new();
590        }
591
592        self.test_server()
593            .audio_tracks(self.token())
594            .unwrap()
595            .into_iter()
596            .filter(|track| track.publisher_id() == publisher_id)
597            .collect()
598    }
599
600    pub fn remote_audio_track_publications(
601        &self,
602        publisher_id: &str,
603    ) -> Vec<Arc<RemoteTrackPublication>> {
604        if !self.is_connected() {
605            return Vec::new();
606        }
607
608        self.test_server()
609            .audio_tracks(self.token())
610            .unwrap()
611            .into_iter()
612            .filter(|track| track.publisher_id() == publisher_id)
613            .map(|_track| Arc::new(RemoteTrackPublication {}))
614            .collect()
615    }
616
617    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
618        if !self.is_connected() {
619            return Vec::new();
620        }
621
622        self.test_server()
623            .video_tracks(self.token())
624            .unwrap()
625            .into_iter()
626            .filter(|track| track.publisher_id() == publisher_id)
627            .collect()
628    }
629
630    pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
631        self.0.lock().updates_rx.clone()
632    }
633
634    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
635        self.0.lock().display_sources = sources;
636    }
637
638    fn test_server(&self) -> Arc<TestServer> {
639        match self.0.lock().connection.1.borrow().clone() {
640            ConnectionState::Disconnected => panic!("must be connected to call this method"),
641            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
642        }
643    }
644
645    fn token(&self) -> String {
646        match self.0.lock().connection.1.borrow().clone() {
647            ConnectionState::Disconnected => panic!("must be connected to call this method"),
648            ConnectionState::Connected { token, .. } => token,
649        }
650    }
651
652    fn is_connected(&self) -> bool {
653        match *self.0.lock().connection.1.borrow() {
654            ConnectionState::Disconnected => false,
655            ConnectionState::Connected { .. } => true,
656        }
657    }
658}
659
660impl Drop for Room {
661    fn drop(&mut self) {
662        if let ConnectionState::Connected { token, .. } = mem::replace(
663            &mut *self.0.lock().connection.0.borrow_mut(),
664            ConnectionState::Disconnected,
665        ) {
666            if let Ok(server) = TestServer::get(&token) {
667                let executor = server.executor.clone();
668                executor
669                    .spawn(async move { server.leave_room(token).await.unwrap() })
670                    .detach();
671            }
672        }
673    }
674}
675
676#[derive(Clone)]
677pub struct LocalTrackPublication {
678    sid: String,
679    room: Weak<Room>,
680}
681
682impl LocalTrackPublication {
683    pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
684        let sid = self.sid.clone();
685        let room = self.room.clone();
686        async move {
687            if let Some(room) = room.upgrade() {
688                room.test_server()
689                    .set_track_muted(&room.token(), &sid, mute)
690            } else {
691                Err(anyhow!("no such room"))
692            }
693        }
694    }
695
696    pub fn is_muted(&self) -> bool {
697        if let Some(room) = self.room.upgrade() {
698            room.test_server()
699                .is_track_muted(&room.token(), &self.sid)
700                .unwrap_or(false)
701        } else {
702            false
703        }
704    }
705
706    pub fn sid(&self) -> String {
707        self.sid.clone()
708    }
709}
710
711pub struct RemoteTrackPublication;
712
713impl RemoteTrackPublication {
714    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
715        async { Ok(()) }
716    }
717
718    pub fn is_muted(&self) -> bool {
719        false
720    }
721
722    pub fn sid(&self) -> String {
723        "".to_string()
724    }
725}
726
727#[derive(Clone)]
728pub struct LocalVideoTrack {
729    frames_rx: async_broadcast::Receiver<Frame>,
730}
731
732impl LocalVideoTrack {
733    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
734        Self {
735            frames_rx: display.frames.1.clone(),
736        }
737    }
738}
739
740#[derive(Clone)]
741pub struct LocalAudioTrack;
742
743impl LocalAudioTrack {
744    pub fn create() -> Self {
745        Self
746    }
747}
748
749#[derive(Debug)]
750pub struct RemoteVideoTrack {
751    server_track: Arc<TestServerVideoTrack>,
752}
753
754impl RemoteVideoTrack {
755    pub fn sid(&self) -> &str {
756        &self.server_track.sid
757    }
758
759    pub fn publisher_id(&self) -> &str {
760        &self.server_track.publisher_id
761    }
762
763    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
764        self.server_track.frames_rx.clone()
765    }
766}
767
768#[derive(Debug)]
769pub struct RemoteAudioTrack {
770    server_track: Arc<TestServerAudioTrack>,
771    room: Weak<Room>,
772}
773
774impl RemoteAudioTrack {
775    pub fn sid(&self) -> &str {
776        &self.server_track.sid
777    }
778
779    pub fn publisher_id(&self) -> &str {
780        &self.server_track.publisher_id
781    }
782
783    pub fn start(&self) {
784        if let Some(room) = self.room.upgrade() {
785            room.0
786                .lock()
787                .paused_audio_tracks
788                .remove(&self.server_track.sid);
789        }
790    }
791
792    pub fn stop(&self) {
793        if let Some(room) = self.room.upgrade() {
794            room.0
795                .lock()
796                .paused_audio_tracks
797                .insert(self.server_track.sid.clone());
798        }
799    }
800
801    pub fn is_playing(&self) -> bool {
802        !self
803            .room
804            .upgrade()
805            .unwrap()
806            .0
807            .lock()
808            .paused_audio_tracks
809            .contains(&self.server_track.sid)
810    }
811}
812
813#[derive(Clone)]
814pub struct MacOSDisplay {
815    frames: (
816        async_broadcast::Sender<Frame>,
817        async_broadcast::Receiver<Frame>,
818    ),
819}
820
821impl MacOSDisplay {
822    pub fn new() -> Self {
823        Self {
824            frames: async_broadcast::broadcast(128),
825        }
826    }
827
828    pub fn send_frame(&self, frame: Frame) {
829        self.frames.0.try_broadcast(frame).unwrap();
830    }
831}
832
833#[derive(Clone, Debug, PartialEq, Eq)]
834pub struct Frame {
835    pub label: String,
836    pub width: usize,
837    pub height: usize,
838}
839
840impl Frame {
841    pub fn width(&self) -> usize {
842        self.width
843    }
844
845    pub fn height(&self) -> usize {
846        self.height
847    }
848
849    #[cfg(target_os = "macos")]
850    pub fn image(&self) -> CVImageBuffer {
851        unimplemented!("you can't call this in test mode")
852    }
853}