1use crate::{ConnectionState, RoomUpdate, Sid};
2use anyhow::{anyhow, Context, Result};
3use async_trait::async_trait;
4use collections::{BTreeMap, HashMap, HashSet};
5use futures::Stream;
6use gpui::{BackgroundExecutor, ImageSource};
7use live_kit_server::{proto, token};
8
9use parking_lot::Mutex;
10use postage::watch;
11use std::{
12 future::Future,
13 mem,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 Arc, Weak,
17 },
18};
19
20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
21
22pub struct TestServer {
23 pub url: String,
24 pub api_key: String,
25 pub secret_key: String,
26 rooms: Mutex<HashMap<String, TestServerRoom>>,
27 executor: BackgroundExecutor,
28}
29
30impl TestServer {
31 pub fn create(
32 url: String,
33 api_key: String,
34 secret_key: String,
35 executor: BackgroundExecutor,
36 ) -> Result<Arc<TestServer>> {
37 let mut servers = SERVERS.lock();
38 if servers.contains_key(&url) {
39 Err(anyhow!("a server with url {:?} already exists", url))
40 } else {
41 let server = Arc::new(TestServer {
42 url: url.clone(),
43 api_key,
44 secret_key,
45 rooms: Default::default(),
46 executor,
47 });
48 servers.insert(url, server.clone());
49 Ok(server)
50 }
51 }
52
53 fn get(url: &str) -> Result<Arc<TestServer>> {
54 Ok(SERVERS
55 .lock()
56 .get(url)
57 .ok_or_else(|| anyhow!("no server found for url"))?
58 .clone())
59 }
60
61 pub fn teardown(&self) -> Result<()> {
62 SERVERS
63 .lock()
64 .remove(&self.url)
65 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
66 Ok(())
67 }
68
69 pub fn create_api_client(&self) -> TestApiClient {
70 TestApiClient {
71 url: self.url.clone(),
72 }
73 }
74
75 pub async fn create_room(&self, room: String) -> Result<()> {
76 self.executor.simulate_random_delay().await;
77 let mut server_rooms = self.rooms.lock();
78 if server_rooms.contains_key(&room) {
79 Err(anyhow!("room {:?} already exists", room))
80 } else {
81 server_rooms.insert(room, Default::default());
82 Ok(())
83 }
84 }
85
86 async fn delete_room(&self, room: String) -> Result<()> {
87 // TODO: clear state associated with all `Room`s.
88 self.executor.simulate_random_delay().await;
89 let mut server_rooms = self.rooms.lock();
90 server_rooms
91 .remove(&room)
92 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
93 Ok(())
94 }
95
96 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
97 self.executor.simulate_random_delay().await;
98 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
99 let identity = claims.sub.unwrap().to_string();
100 let room_name = claims.video.room.unwrap();
101 let mut server_rooms = self.rooms.lock();
102 let room = (*server_rooms).entry(room_name.to_string()).or_default();
103
104 if room.client_rooms.contains_key(&identity) {
105 Err(anyhow!(
106 "{:?} attempted to join room {:?} twice",
107 identity,
108 room_name
109 ))
110 } else {
111 for track in &room.video_tracks {
112 client_room
113 .0
114 .lock()
115 .updates_tx
116 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
117 RemoteVideoTrack {
118 server_track: track.clone(),
119 },
120 )))
121 .unwrap();
122 }
123 for track in &room.audio_tracks {
124 client_room
125 .0
126 .lock()
127 .updates_tx
128 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
129 Arc::new(RemoteAudioTrack {
130 server_track: track.clone(),
131 room: Arc::downgrade(&client_room),
132 }),
133 Arc::new(RemoteTrackPublication),
134 ))
135 .unwrap();
136 }
137 room.client_rooms.insert(identity, client_room);
138 Ok(())
139 }
140 }
141
142 async fn leave_room(&self, token: String) -> Result<()> {
143 self.executor.simulate_random_delay().await;
144 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
145 let identity = claims.sub.unwrap().to_string();
146 let room_name = claims.video.room.unwrap();
147 let mut server_rooms = self.rooms.lock();
148 let room = server_rooms
149 .get_mut(&*room_name)
150 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
151 room.client_rooms.remove(&identity).ok_or_else(|| {
152 anyhow!(
153 "{:?} attempted to leave room {:?} before joining it",
154 identity,
155 room_name
156 )
157 })?;
158 Ok(())
159 }
160
161 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
162 // TODO: clear state associated with the `Room`.
163
164 self.executor.simulate_random_delay().await;
165 let mut server_rooms = self.rooms.lock();
166 let room = server_rooms
167 .get_mut(&room_name)
168 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
169 room.client_rooms.remove(&identity).ok_or_else(|| {
170 anyhow!(
171 "participant {:?} did not join room {:?}",
172 identity,
173 room_name
174 )
175 })?;
176 Ok(())
177 }
178
179 async fn update_participant(
180 &self,
181 room_name: String,
182 identity: String,
183 permission: proto::ParticipantPermission,
184 ) -> Result<()> {
185 self.executor.simulate_random_delay().await;
186 let mut server_rooms = self.rooms.lock();
187 let room = server_rooms
188 .get_mut(&room_name)
189 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
190 room.participant_permissions.insert(identity, permission);
191 Ok(())
192 }
193
194 pub async fn disconnect_client(&self, client_identity: String) {
195 self.executor.simulate_random_delay().await;
196 let mut server_rooms = self.rooms.lock();
197 for room in server_rooms.values_mut() {
198 if let Some(room) = room.client_rooms.remove(&client_identity) {
199 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
200 }
201 }
202 }
203
204 async fn publish_video_track(
205 &self,
206 token: String,
207 local_track: LocalVideoTrack,
208 ) -> Result<Sid> {
209 self.executor.simulate_random_delay().await;
210 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
211 let identity = claims.sub.unwrap().to_string();
212 let room_name = claims.video.room.unwrap();
213
214 let mut server_rooms = self.rooms.lock();
215 let room = server_rooms
216 .get_mut(&*room_name)
217 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
218
219 let can_publish = room
220 .participant_permissions
221 .get(&identity)
222 .map(|permission| permission.can_publish)
223 .or(claims.video.can_publish)
224 .unwrap_or(true);
225
226 if !can_publish {
227 return Err(anyhow!("user is not allowed to publish"));
228 }
229
230 let sid = nanoid::nanoid!(17);
231 let track = Arc::new(TestServerVideoTrack {
232 sid: sid.clone(),
233 publisher_id: identity.clone(),
234 frames_rx: local_track.frames_rx.clone(),
235 });
236
237 room.video_tracks.push(track.clone());
238
239 for (id, client_room) in &room.client_rooms {
240 if *id != identity {
241 let _ = client_room
242 .0
243 .lock()
244 .updates_tx
245 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
246 RemoteVideoTrack {
247 server_track: track.clone(),
248 },
249 )))
250 .unwrap();
251 }
252 }
253
254 Ok(sid)
255 }
256
257 async fn publish_audio_track(
258 &self,
259 token: String,
260 _local_track: &LocalAudioTrack,
261 ) -> Result<Sid> {
262 self.executor.simulate_random_delay().await;
263 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
264 let identity = claims.sub.unwrap().to_string();
265 let room_name = claims.video.room.unwrap();
266
267 let mut server_rooms = self.rooms.lock();
268 let room = server_rooms
269 .get_mut(&*room_name)
270 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
271
272 let can_publish = room
273 .participant_permissions
274 .get(&identity)
275 .map(|permission| permission.can_publish)
276 .or(claims.video.can_publish)
277 .unwrap_or(true);
278
279 if !can_publish {
280 return Err(anyhow!("user is not allowed to publish"));
281 }
282
283 let sid = nanoid::nanoid!(17);
284 let track = Arc::new(TestServerAudioTrack {
285 sid: sid.clone(),
286 publisher_id: identity.clone(),
287 muted: AtomicBool::new(false),
288 });
289
290 let publication = Arc::new(RemoteTrackPublication);
291
292 room.audio_tracks.push(track.clone());
293
294 for (id, client_room) in &room.client_rooms {
295 if *id != identity {
296 let _ = client_room
297 .0
298 .lock()
299 .updates_tx
300 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
301 Arc::new(RemoteAudioTrack {
302 server_track: track.clone(),
303 room: Arc::downgrade(&client_room),
304 }),
305 publication.clone(),
306 ))
307 .unwrap();
308 }
309 }
310
311 Ok(sid)
312 }
313
314 fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
315 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
316 let room_name = claims.video.room.unwrap();
317 let identity = claims.sub.unwrap();
318 let mut server_rooms = self.rooms.lock();
319 let room = server_rooms
320 .get_mut(&*room_name)
321 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
322 if let Some(track) = room
323 .audio_tracks
324 .iter_mut()
325 .find(|track| track.sid == track_sid)
326 {
327 track.muted.store(muted, SeqCst);
328 for (id, client_room) in room.client_rooms.iter() {
329 if *id != identity {
330 client_room
331 .0
332 .lock()
333 .updates_tx
334 .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
335 track_id: track_sid.to_string(),
336 muted,
337 })
338 .unwrap();
339 }
340 }
341 }
342 Ok(())
343 }
344
345 fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
346 let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?;
347 let room_name = claims.video.room.unwrap();
348
349 let mut server_rooms = self.rooms.lock();
350 let room = server_rooms.get_mut(&*room_name)?;
351 room.audio_tracks.iter().find_map(|track| {
352 if track.sid == track_sid {
353 Some(track.muted.load(SeqCst))
354 } else {
355 None
356 }
357 })
358 }
359
360 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
361 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
362 let room_name = claims.video.room.unwrap();
363 let identity = claims.sub.unwrap();
364
365 let mut server_rooms = self.rooms.lock();
366 let room = server_rooms
367 .get_mut(&*room_name)
368 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
369 room.client_rooms
370 .get(identity.as_ref())
371 .ok_or_else(|| anyhow!("not a participant in room"))?;
372 Ok(room
373 .video_tracks
374 .iter()
375 .map(|track| {
376 Arc::new(RemoteVideoTrack {
377 server_track: track.clone(),
378 })
379 })
380 .collect())
381 }
382
383 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
384 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
385 let room_name = claims.video.room.unwrap();
386 let identity = claims.sub.unwrap();
387
388 let mut server_rooms = self.rooms.lock();
389 let room = server_rooms
390 .get_mut(&*room_name)
391 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
392 let client_room = room
393 .client_rooms
394 .get(identity.as_ref())
395 .ok_or_else(|| anyhow!("not a participant in room"))?;
396 Ok(room
397 .audio_tracks
398 .iter()
399 .map(|track| {
400 Arc::new(RemoteAudioTrack {
401 server_track: track.clone(),
402 room: Arc::downgrade(&client_room),
403 })
404 })
405 .collect())
406 }
407}
408
409#[derive(Default)]
410struct TestServerRoom {
411 client_rooms: HashMap<Sid, Arc<Room>>,
412 video_tracks: Vec<Arc<TestServerVideoTrack>>,
413 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
414 participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
415}
416
417#[derive(Debug)]
418struct TestServerVideoTrack {
419 sid: Sid,
420 publisher_id: Sid,
421 frames_rx: async_broadcast::Receiver<Frame>,
422}
423
424#[derive(Debug)]
425struct TestServerAudioTrack {
426 sid: Sid,
427 publisher_id: Sid,
428 muted: AtomicBool,
429}
430
431impl TestServerRoom {}
432
433pub struct TestApiClient {
434 url: String,
435}
436
437#[async_trait]
438impl live_kit_server::api::Client for TestApiClient {
439 fn url(&self) -> &str {
440 &self.url
441 }
442
443 async fn create_room(&self, name: String) -> Result<()> {
444 let server = TestServer::get(&self.url)?;
445 server.create_room(name).await?;
446 Ok(())
447 }
448
449 async fn delete_room(&self, name: String) -> Result<()> {
450 let server = TestServer::get(&self.url)?;
451 server.delete_room(name).await?;
452 Ok(())
453 }
454
455 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
456 let server = TestServer::get(&self.url)?;
457 server.remove_participant(room, identity).await?;
458 Ok(())
459 }
460
461 async fn update_participant(
462 &self,
463 room: String,
464 identity: String,
465 permission: live_kit_server::proto::ParticipantPermission,
466 ) -> Result<()> {
467 let server = TestServer::get(&self.url)?;
468 server
469 .update_participant(room, identity, permission)
470 .await?;
471 Ok(())
472 }
473
474 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
475 let server = TestServer::get(&self.url)?;
476 token::create(
477 &server.api_key,
478 &server.secret_key,
479 Some(identity),
480 token::VideoGrant::to_join(room),
481 )
482 }
483
484 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
485 let server = TestServer::get(&self.url)?;
486 token::create(
487 &server.api_key,
488 &server.secret_key,
489 Some(identity),
490 token::VideoGrant::for_guest(room),
491 )
492 }
493}
494
495struct RoomState {
496 connection: (
497 watch::Sender<ConnectionState>,
498 watch::Receiver<ConnectionState>,
499 ),
500 display_sources: Vec<MacOSDisplay>,
501 paused_audio_tracks: HashSet<Sid>,
502 updates_tx: async_broadcast::Sender<RoomUpdate>,
503 updates_rx: async_broadcast::Receiver<RoomUpdate>,
504}
505
506pub struct Room(Mutex<RoomState>);
507
508impl Room {
509 pub fn new() -> Arc<Self> {
510 let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
511 Arc::new(Self(Mutex::new(RoomState {
512 connection: watch::channel_with(ConnectionState::Disconnected),
513 display_sources: Default::default(),
514 paused_audio_tracks: Default::default(),
515 updates_tx,
516 updates_rx,
517 })))
518 }
519
520 pub fn status(&self) -> watch::Receiver<ConnectionState> {
521 self.0.lock().connection.1.clone()
522 }
523
524 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
525 let this = self.clone();
526 let url = url.to_string();
527 let token = token.to_string();
528 async move {
529 let server = TestServer::get(&url)?;
530 server
531 .join_room(token.clone(), this.clone())
532 .await
533 .context("room join")?;
534 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
535 Ok(())
536 }
537 }
538
539 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
540 let this = self.clone();
541 async move {
542 let server = this.test_server();
543 server.executor.simulate_random_delay().await;
544 Ok(this.0.lock().display_sources.clone())
545 }
546 }
547
548 pub fn publish_video_track(
549 self: &Arc<Self>,
550 track: LocalVideoTrack,
551 ) -> impl Future<Output = Result<LocalTrackPublication>> {
552 let this = self.clone();
553 let track = track.clone();
554 async move {
555 let sid = this
556 .test_server()
557 .publish_video_track(this.token(), track)
558 .await?;
559 Ok(LocalTrackPublication {
560 room: Arc::downgrade(&this),
561 sid,
562 })
563 }
564 }
565
566 pub fn publish_audio_track(
567 self: &Arc<Self>,
568 track: LocalAudioTrack,
569 ) -> impl Future<Output = Result<LocalTrackPublication>> {
570 let this = self.clone();
571 let track = track.clone();
572 async move {
573 let sid = this
574 .test_server()
575 .publish_audio_track(this.token(), &track)
576 .await?;
577 Ok(LocalTrackPublication {
578 room: Arc::downgrade(&this),
579 sid,
580 })
581 }
582 }
583
584 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
585
586 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
587 if !self.is_connected() {
588 return Vec::new();
589 }
590
591 self.test_server()
592 .audio_tracks(self.token())
593 .unwrap()
594 .into_iter()
595 .filter(|track| track.publisher_id() == publisher_id)
596 .collect()
597 }
598
599 pub fn remote_audio_track_publications(
600 &self,
601 publisher_id: &str,
602 ) -> Vec<Arc<RemoteTrackPublication>> {
603 if !self.is_connected() {
604 return Vec::new();
605 }
606
607 self.test_server()
608 .audio_tracks(self.token())
609 .unwrap()
610 .into_iter()
611 .filter(|track| track.publisher_id() == publisher_id)
612 .map(|_track| Arc::new(RemoteTrackPublication {}))
613 .collect()
614 }
615
616 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
617 if !self.is_connected() {
618 return Vec::new();
619 }
620
621 self.test_server()
622 .video_tracks(self.token())
623 .unwrap()
624 .into_iter()
625 .filter(|track| track.publisher_id() == publisher_id)
626 .collect()
627 }
628
629 pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
630 self.0.lock().updates_rx.clone()
631 }
632
633 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
634 self.0.lock().display_sources = sources;
635 }
636
637 fn test_server(&self) -> Arc<TestServer> {
638 match self.0.lock().connection.1.borrow().clone() {
639 ConnectionState::Disconnected => panic!("must be connected to call this method"),
640 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
641 }
642 }
643
644 fn token(&self) -> String {
645 match self.0.lock().connection.1.borrow().clone() {
646 ConnectionState::Disconnected => panic!("must be connected to call this method"),
647 ConnectionState::Connected { token, .. } => token,
648 }
649 }
650
651 fn is_connected(&self) -> bool {
652 match *self.0.lock().connection.1.borrow() {
653 ConnectionState::Disconnected => false,
654 ConnectionState::Connected { .. } => true,
655 }
656 }
657}
658
659impl Drop for Room {
660 fn drop(&mut self) {
661 if let ConnectionState::Connected { token, .. } = mem::replace(
662 &mut *self.0.lock().connection.0.borrow_mut(),
663 ConnectionState::Disconnected,
664 ) {
665 if let Ok(server) = TestServer::get(&token) {
666 let executor = server.executor.clone();
667 executor
668 .spawn(async move { server.leave_room(token).await.unwrap() })
669 .detach();
670 }
671 }
672 }
673}
674
675#[derive(Clone)]
676pub struct LocalTrackPublication {
677 sid: String,
678 room: Weak<Room>,
679}
680
681impl LocalTrackPublication {
682 pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
683 let sid = self.sid.clone();
684 let room = self.room.clone();
685 async move {
686 if let Some(room) = room.upgrade() {
687 room.test_server()
688 .set_track_muted(&room.token(), &sid, mute)
689 } else {
690 Err(anyhow!("no such room"))
691 }
692 }
693 }
694
695 pub fn is_muted(&self) -> bool {
696 if let Some(room) = self.room.upgrade() {
697 room.test_server()
698 .is_track_muted(&room.token(), &self.sid)
699 .unwrap_or(false)
700 } else {
701 false
702 }
703 }
704
705 pub fn sid(&self) -> String {
706 self.sid.clone()
707 }
708}
709
710pub struct RemoteTrackPublication;
711
712impl RemoteTrackPublication {
713 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
714 async { Ok(()) }
715 }
716
717 pub fn is_muted(&self) -> bool {
718 false
719 }
720
721 pub fn sid(&self) -> String {
722 "".to_string()
723 }
724}
725
726#[derive(Clone)]
727pub struct LocalVideoTrack {
728 frames_rx: async_broadcast::Receiver<Frame>,
729}
730
731impl LocalVideoTrack {
732 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
733 Self {
734 frames_rx: display.frames.1.clone(),
735 }
736 }
737}
738
739#[derive(Clone)]
740pub struct LocalAudioTrack;
741
742impl LocalAudioTrack {
743 pub fn create() -> Self {
744 Self
745 }
746}
747
748#[derive(Debug)]
749pub struct RemoteVideoTrack {
750 server_track: Arc<TestServerVideoTrack>,
751}
752
753impl RemoteVideoTrack {
754 pub fn sid(&self) -> &str {
755 &self.server_track.sid
756 }
757
758 pub fn publisher_id(&self) -> &str {
759 &self.server_track.publisher_id
760 }
761
762 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
763 self.server_track.frames_rx.clone()
764 }
765}
766
767#[derive(Debug)]
768pub struct RemoteAudioTrack {
769 server_track: Arc<TestServerAudioTrack>,
770 room: Weak<Room>,
771}
772
773impl RemoteAudioTrack {
774 pub fn sid(&self) -> &str {
775 &self.server_track.sid
776 }
777
778 pub fn publisher_id(&self) -> &str {
779 &self.server_track.publisher_id
780 }
781
782 pub fn start(&self) {
783 if let Some(room) = self.room.upgrade() {
784 room.0
785 .lock()
786 .paused_audio_tracks
787 .remove(&self.server_track.sid);
788 }
789 }
790
791 pub fn stop(&self) {
792 if let Some(room) = self.room.upgrade() {
793 room.0
794 .lock()
795 .paused_audio_tracks
796 .insert(self.server_track.sid.clone());
797 }
798 }
799
800 pub fn is_playing(&self) -> bool {
801 !self
802 .room
803 .upgrade()
804 .unwrap()
805 .0
806 .lock()
807 .paused_audio_tracks
808 .contains(&self.server_track.sid)
809 }
810}
811
812#[derive(Clone)]
813pub struct MacOSDisplay {
814 frames: (
815 async_broadcast::Sender<Frame>,
816 async_broadcast::Receiver<Frame>,
817 ),
818}
819
820impl MacOSDisplay {
821 pub fn new() -> Self {
822 Self {
823 frames: async_broadcast::broadcast(128),
824 }
825 }
826
827 pub fn send_frame(&self, frame: Frame) {
828 self.frames.0.try_broadcast(frame).unwrap();
829 }
830}
831
832#[derive(Clone, Debug, PartialEq, Eq)]
833pub struct Frame {
834 pub label: String,
835 pub width: usize,
836 pub height: usize,
837}
838
839impl Frame {
840 pub fn width(&self) -> usize {
841 self.width
842 }
843
844 pub fn height(&self) -> usize {
845 self.height
846 }
847
848 pub fn image(&self) -> ImageSource {
849 unimplemented!("you can't call this in test mode")
850 }
851}