1pub mod participant;
2pub mod publication;
3pub mod track;
4
5#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
6pub mod webrtc;
7
8#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
9use self::id::*;
10use self::{participant::*, publication::*, track::*};
11use anyhow::{anyhow, Context as _, Result};
12use async_trait::async_trait;
13use collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap, HashSet};
14use gpui::BackgroundExecutor;
15#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
16use livekit::options::TrackPublishOptions;
17use livekit_api::{proto, token};
18use parking_lot::Mutex;
19use postage::{mpsc, sink::Sink};
20use std::sync::{
21 atomic::{AtomicBool, Ordering::SeqCst},
22 Arc, Weak,
23};
24
25#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
26pub use livekit::{id, options, ConnectionState, DisconnectReason, RoomOptions};
27
28static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
29
30pub struct TestServer {
31 pub url: String,
32 pub api_key: String,
33 pub secret_key: String,
34 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
35 rooms: Mutex<HashMap<String, TestServerRoom>>,
36 executor: BackgroundExecutor,
37}
38
39#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
40impl TestServer {
41 pub fn create(
42 url: String,
43 api_key: String,
44 secret_key: String,
45 executor: BackgroundExecutor,
46 ) -> Result<Arc<TestServer>> {
47 let mut servers = SERVERS.lock();
48 if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
49 let server = Arc::new(TestServer {
50 url,
51 api_key,
52 secret_key,
53 rooms: Default::default(),
54 executor,
55 });
56 e.insert(server.clone());
57 Ok(server)
58 } else {
59 Err(anyhow!("a server with url {:?} already exists", url))
60 }
61 }
62
63 fn get(url: &str) -> Result<Arc<TestServer>> {
64 Ok(SERVERS
65 .lock()
66 .get(url)
67 .ok_or_else(|| anyhow!("no server found for url"))?
68 .clone())
69 }
70
71 pub fn teardown(&self) -> Result<()> {
72 SERVERS
73 .lock()
74 .remove(&self.url)
75 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
76 Ok(())
77 }
78
79 pub fn create_api_client(&self) -> TestApiClient {
80 TestApiClient {
81 url: self.url.clone(),
82 }
83 }
84
85 pub async fn create_room(&self, room: String) -> Result<()> {
86 self.executor.simulate_random_delay().await;
87
88 let mut server_rooms = self.rooms.lock();
89 if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
90 e.insert(Default::default());
91 Ok(())
92 } else {
93 Err(anyhow!("room {:?} already exists", room))
94 }
95 }
96
97 async fn delete_room(&self, room: String) -> Result<()> {
98 self.executor.simulate_random_delay().await;
99
100 let mut server_rooms = self.rooms.lock();
101 server_rooms
102 .remove(&room)
103 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
104 Ok(())
105 }
106
107 async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
108 self.executor.simulate_random_delay().await;
109
110 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
111 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
112 let room_name = claims.video.room.unwrap();
113 let mut server_rooms = self.rooms.lock();
114 let room = (*server_rooms).entry(room_name.to_string()).or_default();
115
116 if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
117 for server_track in &room.video_tracks {
118 let track = RemoteTrack::Video(RemoteVideoTrack {
119 server_track: server_track.clone(),
120 _room: client_room.downgrade(),
121 });
122 client_room
123 .0
124 .lock()
125 .updates_tx
126 .blocking_send(RoomEvent::TrackSubscribed {
127 track: track.clone(),
128 publication: RemoteTrackPublication {
129 sid: server_track.sid.clone(),
130 room: client_room.downgrade(),
131 track,
132 },
133 participant: RemoteParticipant {
134 room: client_room.downgrade(),
135 identity: server_track.publisher_id.clone(),
136 },
137 })
138 .unwrap();
139 }
140 for server_track in &room.audio_tracks {
141 let track = RemoteTrack::Audio(RemoteAudioTrack {
142 server_track: server_track.clone(),
143 room: client_room.downgrade(),
144 });
145 client_room
146 .0
147 .lock()
148 .updates_tx
149 .blocking_send(RoomEvent::TrackSubscribed {
150 track: track.clone(),
151 publication: RemoteTrackPublication {
152 sid: server_track.sid.clone(),
153 room: client_room.downgrade(),
154 track,
155 },
156 participant: RemoteParticipant {
157 room: client_room.downgrade(),
158 identity: server_track.publisher_id.clone(),
159 },
160 })
161 .unwrap();
162 }
163 e.insert(client_room);
164 Ok(identity)
165 } else {
166 Err(anyhow!(
167 "{:?} attempted to join room {:?} twice",
168 identity,
169 room_name
170 ))
171 }
172 }
173
174 async fn leave_room(&self, token: String) -> Result<()> {
175 self.executor.simulate_random_delay().await;
176
177 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
178 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
179 let room_name = claims.video.room.unwrap();
180 let mut server_rooms = self.rooms.lock();
181 let room = server_rooms
182 .get_mut(&*room_name)
183 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
184 room.client_rooms.remove(&identity).ok_or_else(|| {
185 anyhow!(
186 "{:?} attempted to leave room {:?} before joining it",
187 identity,
188 room_name
189 )
190 })?;
191 Ok(())
192 }
193
194 fn remote_participants(
195 &self,
196 token: String,
197 ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
198 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
199 let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
200 let room_name = claims.video.room.unwrap().to_string();
201
202 if let Some(server_room) = self.rooms.lock().get(&room_name) {
203 let room = server_room
204 .client_rooms
205 .get(&local_identity)
206 .unwrap()
207 .downgrade();
208 Ok(server_room
209 .client_rooms
210 .iter()
211 .filter(|(identity, _)| *identity != &local_identity)
212 .map(|(identity, _)| {
213 (
214 identity.clone(),
215 RemoteParticipant {
216 room: room.clone(),
217 identity: identity.clone(),
218 },
219 )
220 })
221 .collect())
222 } else {
223 Ok(Default::default())
224 }
225 }
226
227 async fn remove_participant(
228 &self,
229 room_name: String,
230 identity: ParticipantIdentity,
231 ) -> Result<()> {
232 self.executor.simulate_random_delay().await;
233
234 let mut server_rooms = self.rooms.lock();
235 let room = server_rooms
236 .get_mut(&room_name)
237 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
238 room.client_rooms.remove(&identity).ok_or_else(|| {
239 anyhow!(
240 "participant {:?} did not join room {:?}",
241 identity,
242 room_name
243 )
244 })?;
245 Ok(())
246 }
247
248 async fn update_participant(
249 &self,
250 room_name: String,
251 identity: String,
252 permission: proto::ParticipantPermission,
253 ) -> Result<()> {
254 self.executor.simulate_random_delay().await;
255
256 let mut server_rooms = self.rooms.lock();
257 let room = server_rooms
258 .get_mut(&room_name)
259 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
260 room.participant_permissions
261 .insert(ParticipantIdentity(identity), permission);
262 Ok(())
263 }
264
265 pub async fn disconnect_client(&self, client_identity: String) {
266 let client_identity = ParticipantIdentity(client_identity);
267
268 self.executor.simulate_random_delay().await;
269
270 let mut server_rooms = self.rooms.lock();
271 for room in server_rooms.values_mut() {
272 if let Some(room) = room.client_rooms.remove(&client_identity) {
273 let mut room = room.0.lock();
274 room.connection_state = ConnectionState::Disconnected;
275 room.updates_tx
276 .blocking_send(RoomEvent::Disconnected {
277 reason: DisconnectReason::SignalClose,
278 })
279 .ok();
280 }
281 }
282 }
283
284 async fn publish_video_track(
285 &self,
286 token: String,
287 _local_track: LocalVideoTrack,
288 ) -> Result<TrackSid> {
289 self.executor.simulate_random_delay().await;
290
291 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
292 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
293 let room_name = claims.video.room.unwrap();
294
295 let mut server_rooms = self.rooms.lock();
296 let room = server_rooms
297 .get_mut(&*room_name)
298 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
299
300 let can_publish = room
301 .participant_permissions
302 .get(&identity)
303 .map(|permission| permission.can_publish)
304 .or(claims.video.can_publish)
305 .unwrap_or(true);
306
307 if !can_publish {
308 return Err(anyhow!("user is not allowed to publish"));
309 }
310
311 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
312 let server_track = Arc::new(TestServerVideoTrack {
313 sid: sid.clone(),
314 publisher_id: identity.clone(),
315 });
316
317 room.video_tracks.push(server_track.clone());
318
319 for (room_identity, client_room) in &room.client_rooms {
320 if *room_identity != identity {
321 let track = RemoteTrack::Video(RemoteVideoTrack {
322 server_track: server_track.clone(),
323 _room: client_room.downgrade(),
324 });
325 let publication = RemoteTrackPublication {
326 sid: sid.clone(),
327 room: client_room.downgrade(),
328 track: track.clone(),
329 };
330 let participant = RemoteParticipant {
331 identity: identity.clone(),
332 room: client_room.downgrade(),
333 };
334 client_room
335 .0
336 .lock()
337 .updates_tx
338 .blocking_send(RoomEvent::TrackSubscribed {
339 track,
340 publication,
341 participant,
342 })
343 .unwrap();
344 }
345 }
346
347 Ok(sid)
348 }
349
350 async fn publish_audio_track(
351 &self,
352 token: String,
353 _local_track: &LocalAudioTrack,
354 ) -> Result<TrackSid> {
355 self.executor.simulate_random_delay().await;
356
357 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
358 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
359 let room_name = claims.video.room.unwrap();
360
361 let mut server_rooms = self.rooms.lock();
362 let room = server_rooms
363 .get_mut(&*room_name)
364 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
365
366 let can_publish = room
367 .participant_permissions
368 .get(&identity)
369 .map(|permission| permission.can_publish)
370 .or(claims.video.can_publish)
371 .unwrap_or(true);
372
373 if !can_publish {
374 return Err(anyhow!("user is not allowed to publish"));
375 }
376
377 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
378 let server_track = Arc::new(TestServerAudioTrack {
379 sid: sid.clone(),
380 publisher_id: identity.clone(),
381 muted: AtomicBool::new(false),
382 });
383
384 room.audio_tracks.push(server_track.clone());
385
386 for (room_identity, client_room) in &room.client_rooms {
387 if *room_identity != identity {
388 let track = RemoteTrack::Audio(RemoteAudioTrack {
389 server_track: server_track.clone(),
390 room: client_room.downgrade(),
391 });
392 let publication = RemoteTrackPublication {
393 sid: sid.clone(),
394 room: client_room.downgrade(),
395 track: track.clone(),
396 };
397 let participant = RemoteParticipant {
398 identity: identity.clone(),
399 room: client_room.downgrade(),
400 };
401 client_room
402 .0
403 .lock()
404 .updates_tx
405 .blocking_send(RoomEvent::TrackSubscribed {
406 track,
407 publication,
408 participant,
409 })
410 .ok();
411 }
412 }
413
414 Ok(sid)
415 }
416
417 async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
418 Ok(())
419 }
420
421 fn set_track_muted(&self, token: &str, track_sid: &TrackSid, muted: bool) -> Result<()> {
422 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
423 let room_name = claims.video.room.unwrap();
424 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
425 let mut server_rooms = self.rooms.lock();
426 let room = server_rooms
427 .get_mut(&*room_name)
428 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
429 if let Some(track) = room
430 .audio_tracks
431 .iter_mut()
432 .find(|track| track.sid == *track_sid)
433 {
434 track.muted.store(muted, SeqCst);
435 for (id, client_room) in room.client_rooms.iter() {
436 if *id != identity {
437 let participant = Participant::Remote(RemoteParticipant {
438 identity: identity.clone(),
439 room: client_room.downgrade(),
440 });
441 let track = RemoteTrack::Audio(RemoteAudioTrack {
442 server_track: track.clone(),
443 room: client_room.downgrade(),
444 });
445 let publication = TrackPublication::Remote(RemoteTrackPublication {
446 sid: track_sid.clone(),
447 room: client_room.downgrade(),
448 track,
449 });
450
451 let event = if muted {
452 RoomEvent::TrackMuted {
453 participant,
454 publication,
455 }
456 } else {
457 RoomEvent::TrackUnmuted {
458 participant,
459 publication,
460 }
461 };
462
463 client_room
464 .0
465 .lock()
466 .updates_tx
467 .blocking_send(event)
468 .unwrap();
469 }
470 }
471 }
472 Ok(())
473 }
474
475 fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
476 let claims = livekit_api::token::validate(&token, &self.secret_key).ok()?;
477 let room_name = claims.video.room.unwrap();
478
479 let mut server_rooms = self.rooms.lock();
480 let room = server_rooms.get_mut(&*room_name)?;
481 room.audio_tracks.iter().find_map(|track| {
482 if track.sid == *track_sid {
483 Some(track.muted.load(SeqCst))
484 } else {
485 None
486 }
487 })
488 }
489
490 fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
491 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
492 let room_name = claims.video.room.unwrap();
493 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
494
495 let mut server_rooms = self.rooms.lock();
496 let room = server_rooms
497 .get_mut(&*room_name)
498 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
499 let client_room = room
500 .client_rooms
501 .get(&identity)
502 .ok_or_else(|| anyhow!("not a participant in room"))?;
503 Ok(room
504 .video_tracks
505 .iter()
506 .map(|track| RemoteVideoTrack {
507 server_track: track.clone(),
508 _room: client_room.downgrade(),
509 })
510 .collect())
511 }
512
513 fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
514 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
515 let room_name = claims.video.room.unwrap();
516 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
517
518 let mut server_rooms = self.rooms.lock();
519 let room = server_rooms
520 .get_mut(&*room_name)
521 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
522 let client_room = room
523 .client_rooms
524 .get(&identity)
525 .ok_or_else(|| anyhow!("not a participant in room"))?;
526 Ok(room
527 .audio_tracks
528 .iter()
529 .map(|track| RemoteAudioTrack {
530 server_track: track.clone(),
531 room: client_room.downgrade(),
532 })
533 .collect())
534 }
535}
536
537#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
538#[derive(Default, Debug)]
539struct TestServerRoom {
540 client_rooms: HashMap<ParticipantIdentity, Room>,
541 video_tracks: Vec<Arc<TestServerVideoTrack>>,
542 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
543 participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
544}
545
546#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
547#[derive(Debug)]
548struct TestServerVideoTrack {
549 sid: TrackSid,
550 publisher_id: ParticipantIdentity,
551 // frames_rx: async_broadcast::Receiver<Frame>,
552}
553
554#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
555#[derive(Debug)]
556struct TestServerAudioTrack {
557 sid: TrackSid,
558 publisher_id: ParticipantIdentity,
559 muted: AtomicBool,
560}
561
562pub struct TestApiClient {
563 url: String,
564}
565
566#[derive(Clone, Debug)]
567#[non_exhaustive]
568pub enum RoomEvent {
569 ParticipantConnected(RemoteParticipant),
570 ParticipantDisconnected(RemoteParticipant),
571 LocalTrackPublished {
572 publication: LocalTrackPublication,
573 track: LocalTrack,
574 participant: LocalParticipant,
575 },
576 LocalTrackUnpublished {
577 publication: LocalTrackPublication,
578 participant: LocalParticipant,
579 },
580 TrackSubscribed {
581 track: RemoteTrack,
582 publication: RemoteTrackPublication,
583 participant: RemoteParticipant,
584 },
585 TrackUnsubscribed {
586 track: RemoteTrack,
587 publication: RemoteTrackPublication,
588 participant: RemoteParticipant,
589 },
590 TrackSubscriptionFailed {
591 participant: RemoteParticipant,
592 error: String,
593 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
594 track_sid: TrackSid,
595 },
596 TrackPublished {
597 publication: RemoteTrackPublication,
598 participant: RemoteParticipant,
599 },
600 TrackUnpublished {
601 publication: RemoteTrackPublication,
602 participant: RemoteParticipant,
603 },
604 TrackMuted {
605 participant: Participant,
606 publication: TrackPublication,
607 },
608 TrackUnmuted {
609 participant: Participant,
610 publication: TrackPublication,
611 },
612 RoomMetadataChanged {
613 old_metadata: String,
614 metadata: String,
615 },
616 ParticipantMetadataChanged {
617 participant: Participant,
618 old_metadata: String,
619 metadata: String,
620 },
621 ParticipantNameChanged {
622 participant: Participant,
623 old_name: String,
624 name: String,
625 },
626 ActiveSpeakersChanged {
627 speakers: Vec<Participant>,
628 },
629 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
630 ConnectionStateChanged(ConnectionState),
631 Connected {
632 participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
633 },
634 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
635 Disconnected {
636 reason: DisconnectReason,
637 },
638 Reconnecting,
639 Reconnected,
640}
641
642#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
643#[async_trait]
644impl livekit_api::Client for TestApiClient {
645 fn url(&self) -> &str {
646 &self.url
647 }
648
649 async fn create_room(&self, name: String) -> Result<()> {
650 let server = TestServer::get(&self.url)?;
651 server.create_room(name).await?;
652 Ok(())
653 }
654
655 async fn delete_room(&self, name: String) -> Result<()> {
656 let server = TestServer::get(&self.url)?;
657 server.delete_room(name).await?;
658 Ok(())
659 }
660
661 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
662 let server = TestServer::get(&self.url)?;
663 server
664 .remove_participant(room, ParticipantIdentity(identity))
665 .await?;
666 Ok(())
667 }
668
669 async fn update_participant(
670 &self,
671 room: String,
672 identity: String,
673 permission: livekit_api::proto::ParticipantPermission,
674 ) -> Result<()> {
675 let server = TestServer::get(&self.url)?;
676 server
677 .update_participant(room, identity, permission)
678 .await?;
679 Ok(())
680 }
681
682 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
683 let server = TestServer::get(&self.url)?;
684 token::create(
685 &server.api_key,
686 &server.secret_key,
687 Some(identity),
688 token::VideoGrant::to_join(room),
689 )
690 }
691
692 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
693 let server = TestServer::get(&self.url)?;
694 token::create(
695 &server.api_key,
696 &server.secret_key,
697 Some(identity),
698 token::VideoGrant::for_guest(room),
699 )
700 }
701}
702
703struct RoomState {
704 url: String,
705 token: String,
706 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
707 local_identity: ParticipantIdentity,
708 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
709 connection_state: ConnectionState,
710 #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
711 paused_audio_tracks: HashSet<TrackSid>,
712 updates_tx: mpsc::Sender<RoomEvent>,
713}
714
715#[derive(Clone, Debug)]
716pub struct Room(Arc<Mutex<RoomState>>);
717
718#[derive(Clone, Debug)]
719pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
720
721#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
722impl std::fmt::Debug for RoomState {
723 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
724 f.debug_struct("Room")
725 .field("url", &self.url)
726 .field("token", &self.token)
727 .field("local_identity", &self.local_identity)
728 .field("connection_state", &self.connection_state)
729 .field("paused_audio_tracks", &self.paused_audio_tracks)
730 .finish()
731 }
732}
733
734#[cfg(all(target_os = "windows", target_env = "gnu"))]
735impl std::fmt::Debug for RoomState {
736 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
737 f.debug_struct("Room")
738 .field("url", &self.url)
739 .field("token", &self.token)
740 .finish()
741 }
742}
743
744#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
745impl Room {
746 fn downgrade(&self) -> WeakRoom {
747 WeakRoom(Arc::downgrade(&self.0))
748 }
749
750 pub fn connection_state(&self) -> ConnectionState {
751 self.0.lock().connection_state
752 }
753
754 pub fn local_participant(&self) -> LocalParticipant {
755 let identity = self.0.lock().local_identity.clone();
756 LocalParticipant {
757 identity,
758 room: self.clone(),
759 }
760 }
761
762 pub async fn connect(
763 url: &str,
764 token: &str,
765 _options: RoomOptions,
766 ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
767 let server = TestServer::get(&url)?;
768 let (updates_tx, updates_rx) = mpsc::channel(1024);
769 let this = Self(Arc::new(Mutex::new(RoomState {
770 local_identity: ParticipantIdentity(String::new()),
771 url: url.to_string(),
772 token: token.to_string(),
773 connection_state: ConnectionState::Disconnected,
774 paused_audio_tracks: Default::default(),
775 updates_tx,
776 })));
777
778 let identity = server
779 .join_room(token.to_string(), this.clone())
780 .await
781 .context("room join")?;
782 {
783 let mut state = this.0.lock();
784 state.local_identity = identity;
785 state.connection_state = ConnectionState::Connected;
786 }
787
788 Ok((this, updates_rx))
789 }
790
791 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
792 self.test_server()
793 .remote_participants(self.0.lock().token.clone())
794 .unwrap()
795 }
796
797 fn test_server(&self) -> Arc<TestServer> {
798 TestServer::get(&self.0.lock().url).unwrap()
799 }
800
801 fn token(&self) -> String {
802 self.0.lock().token.clone()
803 }
804}
805
806#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
807impl Drop for RoomState {
808 fn drop(&mut self) {
809 if self.connection_state == ConnectionState::Connected {
810 if let Ok(server) = TestServer::get(&self.url) {
811 let executor = server.executor.clone();
812 let token = self.token.clone();
813 executor
814 .spawn(async move { server.leave_room(token).await.ok() })
815 .detach();
816 }
817 }
818 }
819}
820
821impl WeakRoom {
822 fn upgrade(&self) -> Option<Room> {
823 self.0.upgrade().map(Room)
824 }
825}