1use crate::{ConnectionState, RoomUpdate, Sid};
2use anyhow::{anyhow, Context as _, Result};
3use async_trait::async_trait;
4use collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap, HashSet};
5use futures::Stream;
6use gpui::{BackgroundExecutor, SurfaceSource};
7use livekit_api::{proto, token};
8
9use parking_lot::Mutex;
10use postage::watch;
11use std::{
12 future::Future,
13 mem,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 Arc, Weak,
17 },
18};
19
20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
21
22pub struct TestServer {
23 pub url: String,
24 pub api_key: String,
25 pub secret_key: String,
26 rooms: Mutex<HashMap<String, TestServerRoom>>,
27 executor: BackgroundExecutor,
28}
29
30impl TestServer {
31 pub fn create(
32 url: String,
33 api_key: String,
34 secret_key: String,
35 executor: BackgroundExecutor,
36 ) -> Result<Arc<TestServer>> {
37 let mut servers = SERVERS.lock();
38 if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
39 let server = Arc::new(TestServer {
40 url,
41 api_key,
42 secret_key,
43 rooms: Default::default(),
44 executor,
45 });
46 e.insert(server.clone());
47 Ok(server)
48 } else {
49 Err(anyhow!("a server with url {:?} already exists", url))
50 }
51 }
52
53 fn get(url: &str) -> Result<Arc<TestServer>> {
54 Ok(SERVERS
55 .lock()
56 .get(url)
57 .ok_or_else(|| anyhow!("no server found for url"))?
58 .clone())
59 }
60
61 pub fn teardown(&self) -> Result<()> {
62 SERVERS
63 .lock()
64 .remove(&self.url)
65 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
66 Ok(())
67 }
68
69 pub fn create_api_client(&self) -> TestApiClient {
70 TestApiClient {
71 url: self.url.clone(),
72 }
73 }
74
75 pub async fn create_room(&self, room: String) -> Result<()> {
76 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
77 #[cfg(any(test, feature = "test-support"))]
78 self.executor.simulate_random_delay().await;
79 let mut server_rooms = self.rooms.lock();
80 if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
81 e.insert(Default::default());
82 Ok(())
83 } else {
84 Err(anyhow!("room {:?} already exists", room))
85 }
86 }
87
88 async fn delete_room(&self, room: String) -> Result<()> {
89 // TODO: clear state associated with all `Room`s.
90 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
91 #[cfg(any(test, feature = "test-support"))]
92 self.executor.simulate_random_delay().await;
93 let mut server_rooms = self.rooms.lock();
94 server_rooms
95 .remove(&room)
96 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
97 Ok(())
98 }
99
100 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
101 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
102 #[cfg(any(test, feature = "test-support"))]
103 self.executor.simulate_random_delay().await;
104
105 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
106 let identity = claims.sub.unwrap().to_string();
107 let room_name = claims.video.room.unwrap();
108 let mut server_rooms = self.rooms.lock();
109 let room = (*server_rooms).entry(room_name.to_string()).or_default();
110
111 if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
112 for track in &room.video_tracks {
113 client_room
114 .0
115 .lock()
116 .updates_tx
117 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
118 RemoteVideoTrack {
119 server_track: track.clone(),
120 },
121 )))
122 .unwrap();
123 }
124 for track in &room.audio_tracks {
125 client_room
126 .0
127 .lock()
128 .updates_tx
129 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
130 Arc::new(RemoteAudioTrack {
131 server_track: track.clone(),
132 room: Arc::downgrade(&client_room),
133 }),
134 Arc::new(RemoteTrackPublication),
135 ))
136 .unwrap();
137 }
138 e.insert(client_room);
139 Ok(())
140 } else {
141 Err(anyhow!(
142 "{:?} attempted to join room {:?} twice",
143 identity,
144 room_name
145 ))
146 }
147 }
148
149 async fn leave_room(&self, token: String) -> Result<()> {
150 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
151 #[cfg(any(test, feature = "test-support"))]
152 self.executor.simulate_random_delay().await;
153 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
154 let identity = claims.sub.unwrap().to_string();
155 let room_name = claims.video.room.unwrap();
156 let mut server_rooms = self.rooms.lock();
157 let room = server_rooms
158 .get_mut(&*room_name)
159 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
160 room.client_rooms.remove(&identity).ok_or_else(|| {
161 anyhow!(
162 "{:?} attempted to leave room {:?} before joining it",
163 identity,
164 room_name
165 )
166 })?;
167 Ok(())
168 }
169
170 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
171 // TODO: clear state associated with the `Room`.
172 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
173 #[cfg(any(test, feature = "test-support"))]
174 self.executor.simulate_random_delay().await;
175
176 let mut server_rooms = self.rooms.lock();
177 let room = server_rooms
178 .get_mut(&room_name)
179 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
180 room.client_rooms.remove(&identity).ok_or_else(|| {
181 anyhow!(
182 "participant {:?} did not join room {:?}",
183 identity,
184 room_name
185 )
186 })?;
187 Ok(())
188 }
189
190 async fn update_participant(
191 &self,
192 room_name: String,
193 identity: String,
194 permission: proto::ParticipantPermission,
195 ) -> Result<()> {
196 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
197 #[cfg(any(test, feature = "test-support"))]
198 self.executor.simulate_random_delay().await;
199 let mut server_rooms = self.rooms.lock();
200 let room = server_rooms
201 .get_mut(&room_name)
202 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
203 room.participant_permissions.insert(identity, permission);
204 Ok(())
205 }
206
207 pub async fn disconnect_client(&self, client_identity: String) {
208 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
209 #[cfg(any(test, feature = "test-support"))]
210 self.executor.simulate_random_delay().await;
211 let mut server_rooms = self.rooms.lock();
212 for room in server_rooms.values_mut() {
213 if let Some(room) = room.client_rooms.remove(&client_identity) {
214 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
215 }
216 }
217 }
218
219 async fn publish_video_track(
220 &self,
221 token: String,
222 local_track: LocalVideoTrack,
223 ) -> Result<Sid> {
224 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
225 #[cfg(any(test, feature = "test-support"))]
226 self.executor.simulate_random_delay().await;
227 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
228 let identity = claims.sub.unwrap().to_string();
229 let room_name = claims.video.room.unwrap();
230
231 let mut server_rooms = self.rooms.lock();
232 let room = server_rooms
233 .get_mut(&*room_name)
234 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
235
236 let can_publish = room
237 .participant_permissions
238 .get(&identity)
239 .map(|permission| permission.can_publish)
240 .or(claims.video.can_publish)
241 .unwrap_or(true);
242
243 if !can_publish {
244 return Err(anyhow!("user is not allowed to publish"));
245 }
246
247 let sid = nanoid::nanoid!(17);
248 let track = Arc::new(TestServerVideoTrack {
249 sid: sid.clone(),
250 publisher_id: identity.clone(),
251 frames_rx: local_track.frames_rx.clone(),
252 });
253
254 room.video_tracks.push(track.clone());
255
256 for (id, client_room) in &room.client_rooms {
257 if *id != identity {
258 let _ = client_room
259 .0
260 .lock()
261 .updates_tx
262 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
263 RemoteVideoTrack {
264 server_track: track.clone(),
265 },
266 )))
267 .unwrap();
268 }
269 }
270
271 Ok(sid)
272 }
273
274 async fn publish_audio_track(
275 &self,
276 token: String,
277 _local_track: &LocalAudioTrack,
278 ) -> Result<Sid> {
279 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
280 #[cfg(any(test, feature = "test-support"))]
281 self.executor.simulate_random_delay().await;
282
283 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
284 let identity = claims.sub.unwrap().to_string();
285 let room_name = claims.video.room.unwrap();
286
287 let mut server_rooms = self.rooms.lock();
288 let room = server_rooms
289 .get_mut(&*room_name)
290 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
291
292 let can_publish = room
293 .participant_permissions
294 .get(&identity)
295 .map(|permission| permission.can_publish)
296 .or(claims.video.can_publish)
297 .unwrap_or(true);
298
299 if !can_publish {
300 return Err(anyhow!("user is not allowed to publish"));
301 }
302
303 let sid = nanoid::nanoid!(17);
304 let track = Arc::new(TestServerAudioTrack {
305 sid: sid.clone(),
306 publisher_id: identity.clone(),
307 muted: AtomicBool::new(false),
308 });
309
310 let publication = Arc::new(RemoteTrackPublication);
311
312 room.audio_tracks.push(track.clone());
313
314 for (id, client_room) in &room.client_rooms {
315 if *id != identity {
316 let _ = client_room
317 .0
318 .lock()
319 .updates_tx
320 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
321 Arc::new(RemoteAudioTrack {
322 server_track: track.clone(),
323 room: Arc::downgrade(client_room),
324 }),
325 publication.clone(),
326 ))
327 .unwrap();
328 }
329 }
330
331 Ok(sid)
332 }
333
334 fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
335 let claims = livekit_api::token::validate(token, &self.secret_key)?;
336 let room_name = claims.video.room.unwrap();
337 let identity = claims.sub.unwrap();
338 let mut server_rooms = self.rooms.lock();
339 let room = server_rooms
340 .get_mut(&*room_name)
341 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
342 if let Some(track) = room
343 .audio_tracks
344 .iter_mut()
345 .find(|track| track.sid == track_sid)
346 {
347 track.muted.store(muted, SeqCst);
348 for (id, client_room) in room.client_rooms.iter() {
349 if *id != identity {
350 client_room
351 .0
352 .lock()
353 .updates_tx
354 .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
355 track_id: track_sid.to_string(),
356 muted,
357 })
358 .unwrap();
359 }
360 }
361 }
362 Ok(())
363 }
364
365 fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
366 let claims = livekit_api::token::validate(token, &self.secret_key).ok()?;
367 let room_name = claims.video.room.unwrap();
368
369 let mut server_rooms = self.rooms.lock();
370 let room = server_rooms.get_mut(&*room_name)?;
371 room.audio_tracks.iter().find_map(|track| {
372 if track.sid == track_sid {
373 Some(track.muted.load(SeqCst))
374 } else {
375 None
376 }
377 })
378 }
379
380 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
381 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
382 let room_name = claims.video.room.unwrap();
383 let identity = claims.sub.unwrap();
384
385 let mut server_rooms = self.rooms.lock();
386 let room = server_rooms
387 .get_mut(&*room_name)
388 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
389 room.client_rooms
390 .get(identity.as_ref())
391 .ok_or_else(|| anyhow!("not a participant in room"))?;
392 Ok(room
393 .video_tracks
394 .iter()
395 .map(|track| {
396 Arc::new(RemoteVideoTrack {
397 server_track: track.clone(),
398 })
399 })
400 .collect())
401 }
402
403 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
404 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
405 let room_name = claims.video.room.unwrap();
406 let identity = claims.sub.unwrap();
407
408 let mut server_rooms = self.rooms.lock();
409 let room = server_rooms
410 .get_mut(&*room_name)
411 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
412 let client_room = room
413 .client_rooms
414 .get(identity.as_ref())
415 .ok_or_else(|| anyhow!("not a participant in room"))?;
416 Ok(room
417 .audio_tracks
418 .iter()
419 .map(|track| {
420 Arc::new(RemoteAudioTrack {
421 server_track: track.clone(),
422 room: Arc::downgrade(client_room),
423 })
424 })
425 .collect())
426 }
427}
428
429#[derive(Default)]
430struct TestServerRoom {
431 client_rooms: HashMap<Sid, Arc<Room>>,
432 video_tracks: Vec<Arc<TestServerVideoTrack>>,
433 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
434 participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
435}
436
437#[derive(Debug)]
438struct TestServerVideoTrack {
439 sid: Sid,
440 publisher_id: Sid,
441 frames_rx: async_broadcast::Receiver<Frame>,
442}
443
444#[derive(Debug)]
445struct TestServerAudioTrack {
446 sid: Sid,
447 publisher_id: Sid,
448 muted: AtomicBool,
449}
450
451impl TestServerRoom {}
452
453pub struct TestApiClient {
454 url: String,
455}
456
457#[async_trait]
458impl livekit_api::Client for TestApiClient {
459 fn url(&self) -> &str {
460 &self.url
461 }
462
463 async fn create_room(&self, name: String) -> Result<()> {
464 let server = TestServer::get(&self.url)?;
465 server.create_room(name).await?;
466 Ok(())
467 }
468
469 async fn delete_room(&self, name: String) -> Result<()> {
470 let server = TestServer::get(&self.url)?;
471 server.delete_room(name).await?;
472 Ok(())
473 }
474
475 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
476 let server = TestServer::get(&self.url)?;
477 server.remove_participant(room, identity).await?;
478 Ok(())
479 }
480
481 async fn update_participant(
482 &self,
483 room: String,
484 identity: String,
485 permission: livekit_api::proto::ParticipantPermission,
486 ) -> Result<()> {
487 let server = TestServer::get(&self.url)?;
488 server
489 .update_participant(room, identity, permission)
490 .await?;
491 Ok(())
492 }
493
494 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
495 let server = TestServer::get(&self.url)?;
496 token::create(
497 &server.api_key,
498 &server.secret_key,
499 Some(identity),
500 token::VideoGrant::to_join(room),
501 )
502 }
503
504 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
505 let server = TestServer::get(&self.url)?;
506 token::create(
507 &server.api_key,
508 &server.secret_key,
509 Some(identity),
510 token::VideoGrant::for_guest(room),
511 )
512 }
513}
514
515struct RoomState {
516 connection: (
517 watch::Sender<ConnectionState>,
518 watch::Receiver<ConnectionState>,
519 ),
520 display_sources: Vec<MacOSDisplay>,
521 paused_audio_tracks: HashSet<Sid>,
522 updates_tx: async_broadcast::Sender<RoomUpdate>,
523 updates_rx: async_broadcast::Receiver<RoomUpdate>,
524}
525
526pub struct Room(Mutex<RoomState>);
527
528impl Room {
529 pub fn new() -> Arc<Self> {
530 let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
531 Arc::new(Self(Mutex::new(RoomState {
532 connection: watch::channel_with(ConnectionState::Disconnected),
533 display_sources: Default::default(),
534 paused_audio_tracks: Default::default(),
535 updates_tx,
536 updates_rx,
537 })))
538 }
539
540 pub fn status(&self) -> watch::Receiver<ConnectionState> {
541 self.0.lock().connection.1.clone()
542 }
543
544 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
545 let this = self.clone();
546 let url = url.to_string();
547 let token = token.to_string();
548 async move {
549 let server = TestServer::get(&url)?;
550 server
551 .join_room(token.clone(), this.clone())
552 .await
553 .context("room join")?;
554 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
555 Ok(())
556 }
557 }
558
559 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
560 let this = self.clone();
561 async move {
562 // todo(linux): Remove this once the cross-platform LiveKit implementation is merged
563 #[cfg(any(test, feature = "test-support"))]
564 {
565 let server = this.test_server();
566 server.executor.simulate_random_delay().await;
567 }
568
569 Ok(this.0.lock().display_sources.clone())
570 }
571 }
572
573 pub fn publish_video_track(
574 self: &Arc<Self>,
575 track: LocalVideoTrack,
576 ) -> impl Future<Output = Result<LocalTrackPublication>> {
577 let this = self.clone();
578 let track = track.clone();
579 async move {
580 let sid = this
581 .test_server()
582 .publish_video_track(this.token(), track)
583 .await?;
584 Ok(LocalTrackPublication {
585 room: Arc::downgrade(&this),
586 sid,
587 })
588 }
589 }
590
591 pub fn publish_audio_track(
592 self: &Arc<Self>,
593 track: LocalAudioTrack,
594 ) -> impl Future<Output = Result<LocalTrackPublication>> {
595 let this = self.clone();
596 let track = track.clone();
597 async move {
598 let sid = this
599 .test_server()
600 .publish_audio_track(this.token(), &track)
601 .await?;
602 Ok(LocalTrackPublication {
603 room: Arc::downgrade(&this),
604 sid,
605 })
606 }
607 }
608
609 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
610
611 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
612 if !self.is_connected() {
613 return Vec::new();
614 }
615
616 self.test_server()
617 .audio_tracks(self.token())
618 .unwrap()
619 .into_iter()
620 .filter(|track| track.publisher_id() == publisher_id)
621 .collect()
622 }
623
624 pub fn remote_audio_track_publications(
625 &self,
626 publisher_id: &str,
627 ) -> Vec<Arc<RemoteTrackPublication>> {
628 if !self.is_connected() {
629 return Vec::new();
630 }
631
632 self.test_server()
633 .audio_tracks(self.token())
634 .unwrap()
635 .into_iter()
636 .filter(|track| track.publisher_id() == publisher_id)
637 .map(|_track| Arc::new(RemoteTrackPublication {}))
638 .collect()
639 }
640
641 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
642 if !self.is_connected() {
643 return Vec::new();
644 }
645
646 self.test_server()
647 .video_tracks(self.token())
648 .unwrap()
649 .into_iter()
650 .filter(|track| track.publisher_id() == publisher_id)
651 .collect()
652 }
653
654 pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
655 self.0.lock().updates_rx.clone()
656 }
657
658 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
659 self.0.lock().display_sources = sources;
660 }
661
662 fn test_server(&self) -> Arc<TestServer> {
663 match self.0.lock().connection.1.borrow().clone() {
664 ConnectionState::Disconnected => panic!("must be connected to call this method"),
665 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
666 }
667 }
668
669 fn token(&self) -> String {
670 match self.0.lock().connection.1.borrow().clone() {
671 ConnectionState::Disconnected => panic!("must be connected to call this method"),
672 ConnectionState::Connected { token, .. } => token,
673 }
674 }
675
676 fn is_connected(&self) -> bool {
677 match *self.0.lock().connection.1.borrow() {
678 ConnectionState::Disconnected => false,
679 ConnectionState::Connected { .. } => true,
680 }
681 }
682}
683
684impl Drop for Room {
685 fn drop(&mut self) {
686 if let ConnectionState::Connected { token, .. } = mem::replace(
687 &mut *self.0.lock().connection.0.borrow_mut(),
688 ConnectionState::Disconnected,
689 ) {
690 if let Ok(server) = TestServer::get(&token) {
691 let executor = server.executor.clone();
692 executor
693 .spawn(async move { server.leave_room(token).await.unwrap() })
694 .detach();
695 }
696 }
697 }
698}
699
700#[derive(Clone)]
701pub struct LocalTrackPublication {
702 sid: String,
703 room: Weak<Room>,
704}
705
706impl LocalTrackPublication {
707 pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
708 let sid = self.sid.clone();
709 let room = self.room.clone();
710 async move {
711 if let Some(room) = room.upgrade() {
712 room.test_server()
713 .set_track_muted(&room.token(), &sid, mute)
714 } else {
715 Err(anyhow!("no such room"))
716 }
717 }
718 }
719
720 pub fn is_muted(&self) -> bool {
721 if let Some(room) = self.room.upgrade() {
722 room.test_server()
723 .is_track_muted(&room.token(), &self.sid)
724 .unwrap_or(false)
725 } else {
726 false
727 }
728 }
729
730 pub fn sid(&self) -> String {
731 self.sid.clone()
732 }
733}
734
735pub struct RemoteTrackPublication;
736
737impl RemoteTrackPublication {
738 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
739 async { Ok(()) }
740 }
741
742 pub fn is_muted(&self) -> bool {
743 false
744 }
745
746 pub fn sid(&self) -> String {
747 "".to_string()
748 }
749}
750
751#[derive(Clone)]
752pub struct LocalVideoTrack {
753 frames_rx: async_broadcast::Receiver<Frame>,
754}
755
756impl LocalVideoTrack {
757 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
758 Self {
759 frames_rx: display.frames.1.clone(),
760 }
761 }
762}
763
764#[derive(Clone)]
765pub struct LocalAudioTrack;
766
767impl LocalAudioTrack {
768 pub fn create() -> Self {
769 Self
770 }
771}
772
773#[derive(Debug)]
774pub struct RemoteVideoTrack {
775 server_track: Arc<TestServerVideoTrack>,
776}
777
778impl RemoteVideoTrack {
779 pub fn sid(&self) -> &str {
780 &self.server_track.sid
781 }
782
783 pub fn publisher_id(&self) -> &str {
784 &self.server_track.publisher_id
785 }
786
787 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
788 self.server_track.frames_rx.clone()
789 }
790}
791
792#[derive(Debug)]
793pub struct RemoteAudioTrack {
794 server_track: Arc<TestServerAudioTrack>,
795 room: Weak<Room>,
796}
797
798impl RemoteAudioTrack {
799 pub fn sid(&self) -> &str {
800 &self.server_track.sid
801 }
802
803 pub fn publisher_id(&self) -> &str {
804 &self.server_track.publisher_id
805 }
806
807 pub fn start(&self) {
808 if let Some(room) = self.room.upgrade() {
809 room.0
810 .lock()
811 .paused_audio_tracks
812 .remove(&self.server_track.sid);
813 }
814 }
815
816 pub fn stop(&self) {
817 if let Some(room) = self.room.upgrade() {
818 room.0
819 .lock()
820 .paused_audio_tracks
821 .insert(self.server_track.sid.clone());
822 }
823 }
824
825 pub fn is_playing(&self) -> bool {
826 !self
827 .room
828 .upgrade()
829 .unwrap()
830 .0
831 .lock()
832 .paused_audio_tracks
833 .contains(&self.server_track.sid)
834 }
835}
836
837#[derive(Clone)]
838pub struct MacOSDisplay {
839 frames: (
840 async_broadcast::Sender<Frame>,
841 async_broadcast::Receiver<Frame>,
842 ),
843}
844
845impl Default for MacOSDisplay {
846 fn default() -> Self {
847 Self::new()
848 }
849}
850
851impl MacOSDisplay {
852 pub fn new() -> Self {
853 Self {
854 frames: async_broadcast::broadcast(128),
855 }
856 }
857
858 pub fn send_frame(&self, frame: Frame) {
859 self.frames.0.try_broadcast(frame).unwrap();
860 }
861}
862
863#[derive(Clone, Debug, PartialEq, Eq)]
864pub struct Frame {
865 pub label: String,
866 pub width: usize,
867 pub height: usize,
868}
869
870impl Frame {
871 pub fn width(&self) -> usize {
872 self.width
873 }
874
875 pub fn height(&self) -> usize {
876 self.height
877 }
878
879 pub fn image(&self) -> SurfaceSource {
880 unimplemented!("you can't call this in test mode")
881 }
882}