1use crate::{ConnectionState, RoomUpdate, Sid};
2use anyhow::{anyhow, Context, Result};
3use async_trait::async_trait;
4use collections::{BTreeMap, HashMap, HashSet};
5use futures::Stream;
6use gpui::BackgroundExecutor;
7use live_kit_server::{proto, token};
8#[cfg(target_os = "macos")]
9use media::core_video::CVImageBuffer;
10use parking_lot::Mutex;
11use postage::watch;
12use std::{
13 future::Future,
14 mem,
15 sync::{
16 atomic::{AtomicBool, Ordering::SeqCst},
17 Arc, Weak,
18 },
19};
20
21static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
22
23pub struct TestServer {
24 pub url: String,
25 pub api_key: String,
26 pub secret_key: String,
27 rooms: Mutex<HashMap<String, TestServerRoom>>,
28 executor: BackgroundExecutor,
29}
30
31impl TestServer {
32 pub fn create(
33 url: String,
34 api_key: String,
35 secret_key: String,
36 executor: BackgroundExecutor,
37 ) -> Result<Arc<TestServer>> {
38 let mut servers = SERVERS.lock();
39 if servers.contains_key(&url) {
40 Err(anyhow!("a server with url {:?} already exists", url))
41 } else {
42 let server = Arc::new(TestServer {
43 url: url.clone(),
44 api_key,
45 secret_key,
46 rooms: Default::default(),
47 executor,
48 });
49 servers.insert(url, server.clone());
50 Ok(server)
51 }
52 }
53
54 fn get(url: &str) -> Result<Arc<TestServer>> {
55 Ok(SERVERS
56 .lock()
57 .get(url)
58 .ok_or_else(|| anyhow!("no server found for url"))?
59 .clone())
60 }
61
62 pub fn teardown(&self) -> Result<()> {
63 SERVERS
64 .lock()
65 .remove(&self.url)
66 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
67 Ok(())
68 }
69
70 pub fn create_api_client(&self) -> TestApiClient {
71 TestApiClient {
72 url: self.url.clone(),
73 }
74 }
75
76 pub async fn create_room(&self, room: String) -> Result<()> {
77 self.executor.simulate_random_delay().await;
78 let mut server_rooms = self.rooms.lock();
79 if server_rooms.contains_key(&room) {
80 Err(anyhow!("room {:?} already exists", room))
81 } else {
82 server_rooms.insert(room, Default::default());
83 Ok(())
84 }
85 }
86
87 async fn delete_room(&self, room: String) -> Result<()> {
88 // TODO: clear state associated with all `Room`s.
89 self.executor.simulate_random_delay().await;
90 let mut server_rooms = self.rooms.lock();
91 server_rooms
92 .remove(&room)
93 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
94 Ok(())
95 }
96
97 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
98 self.executor.simulate_random_delay().await;
99 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
100 let identity = claims.sub.unwrap().to_string();
101 let room_name = claims.video.room.unwrap();
102 let mut server_rooms = self.rooms.lock();
103 let room = (*server_rooms).entry(room_name.to_string()).or_default();
104
105 if room.client_rooms.contains_key(&identity) {
106 Err(anyhow!(
107 "{:?} attempted to join room {:?} twice",
108 identity,
109 room_name
110 ))
111 } else {
112 for track in &room.video_tracks {
113 client_room
114 .0
115 .lock()
116 .updates_tx
117 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
118 RemoteVideoTrack {
119 server_track: track.clone(),
120 },
121 )))
122 .unwrap();
123 }
124 for track in &room.audio_tracks {
125 client_room
126 .0
127 .lock()
128 .updates_tx
129 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
130 Arc::new(RemoteAudioTrack {
131 server_track: track.clone(),
132 room: Arc::downgrade(&client_room),
133 }),
134 Arc::new(RemoteTrackPublication),
135 ))
136 .unwrap();
137 }
138 room.client_rooms.insert(identity, client_room);
139 Ok(())
140 }
141 }
142
143 async fn leave_room(&self, token: String) -> Result<()> {
144 self.executor.simulate_random_delay().await;
145 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
146 let identity = claims.sub.unwrap().to_string();
147 let room_name = claims.video.room.unwrap();
148 let mut server_rooms = self.rooms.lock();
149 let room = server_rooms
150 .get_mut(&*room_name)
151 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
152 room.client_rooms.remove(&identity).ok_or_else(|| {
153 anyhow!(
154 "{:?} attempted to leave room {:?} before joining it",
155 identity,
156 room_name
157 )
158 })?;
159 Ok(())
160 }
161
162 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
163 // TODO: clear state associated with the `Room`.
164
165 self.executor.simulate_random_delay().await;
166 let mut server_rooms = self.rooms.lock();
167 let room = server_rooms
168 .get_mut(&room_name)
169 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
170 room.client_rooms.remove(&identity).ok_or_else(|| {
171 anyhow!(
172 "participant {:?} did not join room {:?}",
173 identity,
174 room_name
175 )
176 })?;
177 Ok(())
178 }
179
180 async fn update_participant(
181 &self,
182 room_name: String,
183 identity: String,
184 permission: proto::ParticipantPermission,
185 ) -> Result<()> {
186 self.executor.simulate_random_delay().await;
187 let mut server_rooms = self.rooms.lock();
188 let room = server_rooms
189 .get_mut(&room_name)
190 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
191 room.participant_permissions.insert(identity, permission);
192 Ok(())
193 }
194
195 pub async fn disconnect_client(&self, client_identity: String) {
196 self.executor.simulate_random_delay().await;
197 let mut server_rooms = self.rooms.lock();
198 for room in server_rooms.values_mut() {
199 if let Some(room) = room.client_rooms.remove(&client_identity) {
200 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
201 }
202 }
203 }
204
205 async fn publish_video_track(
206 &self,
207 token: String,
208 local_track: LocalVideoTrack,
209 ) -> Result<Sid> {
210 self.executor.simulate_random_delay().await;
211 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
212 let identity = claims.sub.unwrap().to_string();
213 let room_name = claims.video.room.unwrap();
214
215 let mut server_rooms = self.rooms.lock();
216 let room = server_rooms
217 .get_mut(&*room_name)
218 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
219
220 let can_publish = room
221 .participant_permissions
222 .get(&identity)
223 .map(|permission| permission.can_publish)
224 .or(claims.video.can_publish)
225 .unwrap_or(true);
226
227 if !can_publish {
228 return Err(anyhow!("user is not allowed to publish"));
229 }
230
231 let sid = nanoid::nanoid!(17);
232 let track = Arc::new(TestServerVideoTrack {
233 sid: sid.clone(),
234 publisher_id: identity.clone(),
235 frames_rx: local_track.frames_rx.clone(),
236 });
237
238 room.video_tracks.push(track.clone());
239
240 for (id, client_room) in &room.client_rooms {
241 if *id != identity {
242 let _ = client_room
243 .0
244 .lock()
245 .updates_tx
246 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
247 RemoteVideoTrack {
248 server_track: track.clone(),
249 },
250 )))
251 .unwrap();
252 }
253 }
254
255 Ok(sid)
256 }
257
258 async fn publish_audio_track(
259 &self,
260 token: String,
261 _local_track: &LocalAudioTrack,
262 ) -> Result<Sid> {
263 self.executor.simulate_random_delay().await;
264 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
265 let identity = claims.sub.unwrap().to_string();
266 let room_name = claims.video.room.unwrap();
267
268 let mut server_rooms = self.rooms.lock();
269 let room = server_rooms
270 .get_mut(&*room_name)
271 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
272
273 let can_publish = room
274 .participant_permissions
275 .get(&identity)
276 .map(|permission| permission.can_publish)
277 .or(claims.video.can_publish)
278 .unwrap_or(true);
279
280 if !can_publish {
281 return Err(anyhow!("user is not allowed to publish"));
282 }
283
284 let sid = nanoid::nanoid!(17);
285 let track = Arc::new(TestServerAudioTrack {
286 sid: sid.clone(),
287 publisher_id: identity.clone(),
288 muted: AtomicBool::new(false),
289 });
290
291 let publication = Arc::new(RemoteTrackPublication);
292
293 room.audio_tracks.push(track.clone());
294
295 for (id, client_room) in &room.client_rooms {
296 if *id != identity {
297 let _ = client_room
298 .0
299 .lock()
300 .updates_tx
301 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
302 Arc::new(RemoteAudioTrack {
303 server_track: track.clone(),
304 room: Arc::downgrade(&client_room),
305 }),
306 publication.clone(),
307 ))
308 .unwrap();
309 }
310 }
311
312 Ok(sid)
313 }
314
315 fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
316 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
317 let room_name = claims.video.room.unwrap();
318 let identity = claims.sub.unwrap();
319 let mut server_rooms = self.rooms.lock();
320 let room = server_rooms
321 .get_mut(&*room_name)
322 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
323 if let Some(track) = room
324 .audio_tracks
325 .iter_mut()
326 .find(|track| track.sid == track_sid)
327 {
328 track.muted.store(muted, SeqCst);
329 for (id, client_room) in room.client_rooms.iter() {
330 if *id != identity {
331 client_room
332 .0
333 .lock()
334 .updates_tx
335 .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
336 track_id: track_sid.to_string(),
337 muted,
338 })
339 .unwrap();
340 }
341 }
342 }
343 Ok(())
344 }
345
346 fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
347 let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?;
348 let room_name = claims.video.room.unwrap();
349
350 let mut server_rooms = self.rooms.lock();
351 let room = server_rooms.get_mut(&*room_name)?;
352 room.audio_tracks.iter().find_map(|track| {
353 if track.sid == track_sid {
354 Some(track.muted.load(SeqCst))
355 } else {
356 None
357 }
358 })
359 }
360
361 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
362 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
363 let room_name = claims.video.room.unwrap();
364 let identity = claims.sub.unwrap();
365
366 let mut server_rooms = self.rooms.lock();
367 let room = server_rooms
368 .get_mut(&*room_name)
369 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
370 room.client_rooms
371 .get(identity.as_ref())
372 .ok_or_else(|| anyhow!("not a participant in room"))?;
373 Ok(room
374 .video_tracks
375 .iter()
376 .map(|track| {
377 Arc::new(RemoteVideoTrack {
378 server_track: track.clone(),
379 })
380 })
381 .collect())
382 }
383
384 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
385 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
386 let room_name = claims.video.room.unwrap();
387 let identity = claims.sub.unwrap();
388
389 let mut server_rooms = self.rooms.lock();
390 let room = server_rooms
391 .get_mut(&*room_name)
392 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
393 let client_room = room
394 .client_rooms
395 .get(identity.as_ref())
396 .ok_or_else(|| anyhow!("not a participant in room"))?;
397 Ok(room
398 .audio_tracks
399 .iter()
400 .map(|track| {
401 Arc::new(RemoteAudioTrack {
402 server_track: track.clone(),
403 room: Arc::downgrade(&client_room),
404 })
405 })
406 .collect())
407 }
408}
409
410#[derive(Default)]
411struct TestServerRoom {
412 client_rooms: HashMap<Sid, Arc<Room>>,
413 video_tracks: Vec<Arc<TestServerVideoTrack>>,
414 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
415 participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
416}
417
418#[derive(Debug)]
419struct TestServerVideoTrack {
420 sid: Sid,
421 publisher_id: Sid,
422 frames_rx: async_broadcast::Receiver<Frame>,
423}
424
425#[derive(Debug)]
426struct TestServerAudioTrack {
427 sid: Sid,
428 publisher_id: Sid,
429 muted: AtomicBool,
430}
431
432impl TestServerRoom {}
433
434pub struct TestApiClient {
435 url: String,
436}
437
438#[async_trait]
439impl live_kit_server::api::Client for TestApiClient {
440 fn url(&self) -> &str {
441 &self.url
442 }
443
444 async fn create_room(&self, name: String) -> Result<()> {
445 let server = TestServer::get(&self.url)?;
446 server.create_room(name).await?;
447 Ok(())
448 }
449
450 async fn delete_room(&self, name: String) -> Result<()> {
451 let server = TestServer::get(&self.url)?;
452 server.delete_room(name).await?;
453 Ok(())
454 }
455
456 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
457 let server = TestServer::get(&self.url)?;
458 server.remove_participant(room, identity).await?;
459 Ok(())
460 }
461
462 async fn update_participant(
463 &self,
464 room: String,
465 identity: String,
466 permission: live_kit_server::proto::ParticipantPermission,
467 ) -> Result<()> {
468 let server = TestServer::get(&self.url)?;
469 server
470 .update_participant(room, identity, permission)
471 .await?;
472 Ok(())
473 }
474
475 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
476 let server = TestServer::get(&self.url)?;
477 token::create(
478 &server.api_key,
479 &server.secret_key,
480 Some(identity),
481 token::VideoGrant::to_join(room),
482 )
483 }
484
485 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
486 let server = TestServer::get(&self.url)?;
487 token::create(
488 &server.api_key,
489 &server.secret_key,
490 Some(identity),
491 token::VideoGrant::for_guest(room),
492 )
493 }
494}
495
496struct RoomState {
497 connection: (
498 watch::Sender<ConnectionState>,
499 watch::Receiver<ConnectionState>,
500 ),
501 display_sources: Vec<MacOSDisplay>,
502 paused_audio_tracks: HashSet<Sid>,
503 updates_tx: async_broadcast::Sender<RoomUpdate>,
504 updates_rx: async_broadcast::Receiver<RoomUpdate>,
505}
506
507pub struct Room(Mutex<RoomState>);
508
509impl Room {
510 pub fn new() -> Arc<Self> {
511 let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
512 Arc::new(Self(Mutex::new(RoomState {
513 connection: watch::channel_with(ConnectionState::Disconnected),
514 display_sources: Default::default(),
515 paused_audio_tracks: Default::default(),
516 updates_tx,
517 updates_rx,
518 })))
519 }
520
521 pub fn status(&self) -> watch::Receiver<ConnectionState> {
522 self.0.lock().connection.1.clone()
523 }
524
525 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
526 let this = self.clone();
527 let url = url.to_string();
528 let token = token.to_string();
529 async move {
530 let server = TestServer::get(&url)?;
531 server
532 .join_room(token.clone(), this.clone())
533 .await
534 .context("room join")?;
535 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
536 Ok(())
537 }
538 }
539
540 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
541 let this = self.clone();
542 async move {
543 let server = this.test_server();
544 server.executor.simulate_random_delay().await;
545 Ok(this.0.lock().display_sources.clone())
546 }
547 }
548
549 pub fn publish_video_track(
550 self: &Arc<Self>,
551 track: LocalVideoTrack,
552 ) -> impl Future<Output = Result<LocalTrackPublication>> {
553 let this = self.clone();
554 let track = track.clone();
555 async move {
556 let sid = this
557 .test_server()
558 .publish_video_track(this.token(), track)
559 .await?;
560 Ok(LocalTrackPublication {
561 room: Arc::downgrade(&this),
562 sid,
563 })
564 }
565 }
566
567 pub fn publish_audio_track(
568 self: &Arc<Self>,
569 track: LocalAudioTrack,
570 ) -> impl Future<Output = Result<LocalTrackPublication>> {
571 let this = self.clone();
572 let track = track.clone();
573 async move {
574 let sid = this
575 .test_server()
576 .publish_audio_track(this.token(), &track)
577 .await?;
578 Ok(LocalTrackPublication {
579 room: Arc::downgrade(&this),
580 sid,
581 })
582 }
583 }
584
585 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
586
587 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
588 if !self.is_connected() {
589 return Vec::new();
590 }
591
592 self.test_server()
593 .audio_tracks(self.token())
594 .unwrap()
595 .into_iter()
596 .filter(|track| track.publisher_id() == publisher_id)
597 .collect()
598 }
599
600 pub fn remote_audio_track_publications(
601 &self,
602 publisher_id: &str,
603 ) -> Vec<Arc<RemoteTrackPublication>> {
604 if !self.is_connected() {
605 return Vec::new();
606 }
607
608 self.test_server()
609 .audio_tracks(self.token())
610 .unwrap()
611 .into_iter()
612 .filter(|track| track.publisher_id() == publisher_id)
613 .map(|_track| Arc::new(RemoteTrackPublication {}))
614 .collect()
615 }
616
617 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
618 if !self.is_connected() {
619 return Vec::new();
620 }
621
622 self.test_server()
623 .video_tracks(self.token())
624 .unwrap()
625 .into_iter()
626 .filter(|track| track.publisher_id() == publisher_id)
627 .collect()
628 }
629
630 pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
631 self.0.lock().updates_rx.clone()
632 }
633
634 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
635 self.0.lock().display_sources = sources;
636 }
637
638 fn test_server(&self) -> Arc<TestServer> {
639 match self.0.lock().connection.1.borrow().clone() {
640 ConnectionState::Disconnected => panic!("must be connected to call this method"),
641 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
642 }
643 }
644
645 fn token(&self) -> String {
646 match self.0.lock().connection.1.borrow().clone() {
647 ConnectionState::Disconnected => panic!("must be connected to call this method"),
648 ConnectionState::Connected { token, .. } => token,
649 }
650 }
651
652 fn is_connected(&self) -> bool {
653 match *self.0.lock().connection.1.borrow() {
654 ConnectionState::Disconnected => false,
655 ConnectionState::Connected { .. } => true,
656 }
657 }
658}
659
660impl Drop for Room {
661 fn drop(&mut self) {
662 if let ConnectionState::Connected { token, .. } = mem::replace(
663 &mut *self.0.lock().connection.0.borrow_mut(),
664 ConnectionState::Disconnected,
665 ) {
666 if let Ok(server) = TestServer::get(&token) {
667 let executor = server.executor.clone();
668 executor
669 .spawn(async move { server.leave_room(token).await.unwrap() })
670 .detach();
671 }
672 }
673 }
674}
675
676#[derive(Clone)]
677pub struct LocalTrackPublication {
678 sid: String,
679 room: Weak<Room>,
680}
681
682impl LocalTrackPublication {
683 pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
684 let sid = self.sid.clone();
685 let room = self.room.clone();
686 async move {
687 if let Some(room) = room.upgrade() {
688 room.test_server()
689 .set_track_muted(&room.token(), &sid, mute)
690 } else {
691 Err(anyhow!("no such room"))
692 }
693 }
694 }
695
696 pub fn is_muted(&self) -> bool {
697 if let Some(room) = self.room.upgrade() {
698 room.test_server()
699 .is_track_muted(&room.token(), &self.sid)
700 .unwrap_or(false)
701 } else {
702 false
703 }
704 }
705
706 pub fn sid(&self) -> String {
707 self.sid.clone()
708 }
709}
710
711pub struct RemoteTrackPublication;
712
713impl RemoteTrackPublication {
714 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
715 async { Ok(()) }
716 }
717
718 pub fn is_muted(&self) -> bool {
719 false
720 }
721
722 pub fn sid(&self) -> String {
723 "".to_string()
724 }
725}
726
727#[derive(Clone)]
728pub struct LocalVideoTrack {
729 frames_rx: async_broadcast::Receiver<Frame>,
730}
731
732impl LocalVideoTrack {
733 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
734 Self {
735 frames_rx: display.frames.1.clone(),
736 }
737 }
738}
739
740#[derive(Clone)]
741pub struct LocalAudioTrack;
742
743impl LocalAudioTrack {
744 pub fn create() -> Self {
745 Self
746 }
747}
748
749#[derive(Debug)]
750pub struct RemoteVideoTrack {
751 server_track: Arc<TestServerVideoTrack>,
752}
753
754impl RemoteVideoTrack {
755 pub fn sid(&self) -> &str {
756 &self.server_track.sid
757 }
758
759 pub fn publisher_id(&self) -> &str {
760 &self.server_track.publisher_id
761 }
762
763 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
764 self.server_track.frames_rx.clone()
765 }
766}
767
768#[derive(Debug)]
769pub struct RemoteAudioTrack {
770 server_track: Arc<TestServerAudioTrack>,
771 room: Weak<Room>,
772}
773
774impl RemoteAudioTrack {
775 pub fn sid(&self) -> &str {
776 &self.server_track.sid
777 }
778
779 pub fn publisher_id(&self) -> &str {
780 &self.server_track.publisher_id
781 }
782
783 pub fn start(&self) {
784 if let Some(room) = self.room.upgrade() {
785 room.0
786 .lock()
787 .paused_audio_tracks
788 .remove(&self.server_track.sid);
789 }
790 }
791
792 pub fn stop(&self) {
793 if let Some(room) = self.room.upgrade() {
794 room.0
795 .lock()
796 .paused_audio_tracks
797 .insert(self.server_track.sid.clone());
798 }
799 }
800
801 pub fn is_playing(&self) -> bool {
802 !self
803 .room
804 .upgrade()
805 .unwrap()
806 .0
807 .lock()
808 .paused_audio_tracks
809 .contains(&self.server_track.sid)
810 }
811}
812
813#[derive(Clone)]
814pub struct MacOSDisplay {
815 frames: (
816 async_broadcast::Sender<Frame>,
817 async_broadcast::Receiver<Frame>,
818 ),
819}
820
821impl MacOSDisplay {
822 pub fn new() -> Self {
823 Self {
824 frames: async_broadcast::broadcast(128),
825 }
826 }
827
828 pub fn send_frame(&self, frame: Frame) {
829 self.frames.0.try_broadcast(frame).unwrap();
830 }
831}
832
833#[derive(Clone, Debug, PartialEq, Eq)]
834pub struct Frame {
835 pub label: String,
836 pub width: usize,
837 pub height: usize,
838}
839
840impl Frame {
841 pub fn width(&self) -> usize {
842 self.width
843 }
844
845 pub fn height(&self) -> usize {
846 self.height
847 }
848
849 #[cfg(target_os = "macos")]
850 pub fn image(&self) -> CVImageBuffer {
851 unimplemented!("you can't call this in test mode")
852 }
853}