test.rs

  1pub mod participant;
  2pub mod publication;
  3pub mod track;
  4
  5#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
  6pub mod webrtc;
  7
  8#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
  9use self::id::*;
 10use self::{participant::*, publication::*, track::*};
 11use anyhow::{anyhow, Context as _, Result};
 12use async_trait::async_trait;
 13use collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap, HashSet};
 14use gpui::BackgroundExecutor;
 15#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 16use livekit::options::TrackPublishOptions;
 17use livekit_api::{proto, token};
 18use parking_lot::Mutex;
 19use postage::{mpsc, sink::Sink};
 20use std::sync::{
 21    atomic::{AtomicBool, Ordering::SeqCst},
 22    Arc, Weak,
 23};
 24
 25#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 26pub use livekit::{id, options, ConnectionState, DisconnectReason, RoomOptions};
 27
 28static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 29
 30pub struct TestServer {
 31    pub url: String,
 32    pub api_key: String,
 33    pub secret_key: String,
 34    #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 35    rooms: Mutex<HashMap<String, TestServerRoom>>,
 36    executor: BackgroundExecutor,
 37}
 38
 39#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 40impl TestServer {
 41    pub fn create(
 42        url: String,
 43        api_key: String,
 44        secret_key: String,
 45        executor: BackgroundExecutor,
 46    ) -> Result<Arc<TestServer>> {
 47        let mut servers = SERVERS.lock();
 48        if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
 49            let server = Arc::new(TestServer {
 50                url,
 51                api_key,
 52                secret_key,
 53                rooms: Default::default(),
 54                executor,
 55            });
 56            e.insert(server.clone());
 57            Ok(server)
 58        } else {
 59            Err(anyhow!("a server with url {:?} already exists", url))
 60        }
 61    }
 62
 63    fn get(url: &str) -> Result<Arc<TestServer>> {
 64        Ok(SERVERS
 65            .lock()
 66            .get(url)
 67            .ok_or_else(|| anyhow!("no server found for url"))?
 68            .clone())
 69    }
 70
 71    pub fn teardown(&self) -> Result<()> {
 72        SERVERS
 73            .lock()
 74            .remove(&self.url)
 75            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 76        Ok(())
 77    }
 78
 79    pub fn create_api_client(&self) -> TestApiClient {
 80        TestApiClient {
 81            url: self.url.clone(),
 82        }
 83    }
 84
 85    pub async fn create_room(&self, room: String) -> Result<()> {
 86        self.executor.simulate_random_delay().await;
 87
 88        let mut server_rooms = self.rooms.lock();
 89        if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
 90            e.insert(Default::default());
 91            Ok(())
 92        } else {
 93            Err(anyhow!("room {:?} already exists", room))
 94        }
 95    }
 96
 97    async fn delete_room(&self, room: String) -> Result<()> {
 98        self.executor.simulate_random_delay().await;
 99
100        let mut server_rooms = self.rooms.lock();
101        server_rooms
102            .remove(&room)
103            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
104        Ok(())
105    }
106
107    async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
108        self.executor.simulate_random_delay().await;
109
110        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
111        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
112        let room_name = claims.video.room.unwrap();
113        let mut server_rooms = self.rooms.lock();
114        let room = (*server_rooms).entry(room_name.to_string()).or_default();
115
116        if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
117            for server_track in &room.video_tracks {
118                let track = RemoteTrack::Video(RemoteVideoTrack {
119                    server_track: server_track.clone(),
120                    _room: client_room.downgrade(),
121                });
122                client_room
123                    .0
124                    .lock()
125                    .updates_tx
126                    .blocking_send(RoomEvent::TrackSubscribed {
127                        track: track.clone(),
128                        publication: RemoteTrackPublication {
129                            sid: server_track.sid.clone(),
130                            room: client_room.downgrade(),
131                            track,
132                        },
133                        participant: RemoteParticipant {
134                            room: client_room.downgrade(),
135                            identity: server_track.publisher_id.clone(),
136                        },
137                    })
138                    .unwrap();
139            }
140            for server_track in &room.audio_tracks {
141                let track = RemoteTrack::Audio(RemoteAudioTrack {
142                    server_track: server_track.clone(),
143                    room: client_room.downgrade(),
144                });
145                client_room
146                    .0
147                    .lock()
148                    .updates_tx
149                    .blocking_send(RoomEvent::TrackSubscribed {
150                        track: track.clone(),
151                        publication: RemoteTrackPublication {
152                            sid: server_track.sid.clone(),
153                            room: client_room.downgrade(),
154                            track,
155                        },
156                        participant: RemoteParticipant {
157                            room: client_room.downgrade(),
158                            identity: server_track.publisher_id.clone(),
159                        },
160                    })
161                    .unwrap();
162            }
163            e.insert(client_room);
164            Ok(identity)
165        } else {
166            Err(anyhow!(
167                "{:?} attempted to join room {:?} twice",
168                identity,
169                room_name
170            ))
171        }
172    }
173
174    async fn leave_room(&self, token: String) -> Result<()> {
175        self.executor.simulate_random_delay().await;
176
177        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
178        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
179        let room_name = claims.video.room.unwrap();
180        let mut server_rooms = self.rooms.lock();
181        let room = server_rooms
182            .get_mut(&*room_name)
183            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
184        room.client_rooms.remove(&identity).ok_or_else(|| {
185            anyhow!(
186                "{:?} attempted to leave room {:?} before joining it",
187                identity,
188                room_name
189            )
190        })?;
191        Ok(())
192    }
193
194    fn remote_participants(
195        &self,
196        token: String,
197    ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
198        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
199        let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
200        let room_name = claims.video.room.unwrap().to_string();
201
202        if let Some(server_room) = self.rooms.lock().get(&room_name) {
203            let room = server_room
204                .client_rooms
205                .get(&local_identity)
206                .unwrap()
207                .downgrade();
208            Ok(server_room
209                .client_rooms
210                .iter()
211                .filter(|(identity, _)| *identity != &local_identity)
212                .map(|(identity, _)| {
213                    (
214                        identity.clone(),
215                        RemoteParticipant {
216                            room: room.clone(),
217                            identity: identity.clone(),
218                        },
219                    )
220                })
221                .collect())
222        } else {
223            Ok(Default::default())
224        }
225    }
226
227    async fn remove_participant(
228        &self,
229        room_name: String,
230        identity: ParticipantIdentity,
231    ) -> Result<()> {
232        self.executor.simulate_random_delay().await;
233
234        let mut server_rooms = self.rooms.lock();
235        let room = server_rooms
236            .get_mut(&room_name)
237            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
238        room.client_rooms.remove(&identity).ok_or_else(|| {
239            anyhow!(
240                "participant {:?} did not join room {:?}",
241                identity,
242                room_name
243            )
244        })?;
245        Ok(())
246    }
247
248    async fn update_participant(
249        &self,
250        room_name: String,
251        identity: String,
252        permission: proto::ParticipantPermission,
253    ) -> Result<()> {
254        self.executor.simulate_random_delay().await;
255
256        let mut server_rooms = self.rooms.lock();
257        let room = server_rooms
258            .get_mut(&room_name)
259            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
260        room.participant_permissions
261            .insert(ParticipantIdentity(identity), permission);
262        Ok(())
263    }
264
265    pub async fn disconnect_client(&self, client_identity: String) {
266        let client_identity = ParticipantIdentity(client_identity);
267
268        self.executor.simulate_random_delay().await;
269
270        let mut server_rooms = self.rooms.lock();
271        for room in server_rooms.values_mut() {
272            if let Some(room) = room.client_rooms.remove(&client_identity) {
273                let mut room = room.0.lock();
274                room.connection_state = ConnectionState::Disconnected;
275                room.updates_tx
276                    .blocking_send(RoomEvent::Disconnected {
277                        reason: DisconnectReason::SignalClose,
278                    })
279                    .ok();
280            }
281        }
282    }
283
284    async fn publish_video_track(
285        &self,
286        token: String,
287        _local_track: LocalVideoTrack,
288    ) -> Result<TrackSid> {
289        self.executor.simulate_random_delay().await;
290
291        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
292        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
293        let room_name = claims.video.room.unwrap();
294
295        let mut server_rooms = self.rooms.lock();
296        let room = server_rooms
297            .get_mut(&*room_name)
298            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
299
300        let can_publish = room
301            .participant_permissions
302            .get(&identity)
303            .map(|permission| permission.can_publish)
304            .or(claims.video.can_publish)
305            .unwrap_or(true);
306
307        if !can_publish {
308            return Err(anyhow!("user is not allowed to publish"));
309        }
310
311        let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
312        let server_track = Arc::new(TestServerVideoTrack {
313            sid: sid.clone(),
314            publisher_id: identity.clone(),
315        });
316
317        room.video_tracks.push(server_track.clone());
318
319        for (room_identity, client_room) in &room.client_rooms {
320            if *room_identity != identity {
321                let track = RemoteTrack::Video(RemoteVideoTrack {
322                    server_track: server_track.clone(),
323                    _room: client_room.downgrade(),
324                });
325                let publication = RemoteTrackPublication {
326                    sid: sid.clone(),
327                    room: client_room.downgrade(),
328                    track: track.clone(),
329                };
330                let participant = RemoteParticipant {
331                    identity: identity.clone(),
332                    room: client_room.downgrade(),
333                };
334                client_room
335                    .0
336                    .lock()
337                    .updates_tx
338                    .blocking_send(RoomEvent::TrackSubscribed {
339                        track,
340                        publication,
341                        participant,
342                    })
343                    .unwrap();
344            }
345        }
346
347        Ok(sid)
348    }
349
350    async fn publish_audio_track(
351        &self,
352        token: String,
353        _local_track: &LocalAudioTrack,
354    ) -> Result<TrackSid> {
355        self.executor.simulate_random_delay().await;
356
357        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
358        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
359        let room_name = claims.video.room.unwrap();
360
361        let mut server_rooms = self.rooms.lock();
362        let room = server_rooms
363            .get_mut(&*room_name)
364            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
365
366        let can_publish = room
367            .participant_permissions
368            .get(&identity)
369            .map(|permission| permission.can_publish)
370            .or(claims.video.can_publish)
371            .unwrap_or(true);
372
373        if !can_publish {
374            return Err(anyhow!("user is not allowed to publish"));
375        }
376
377        let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
378        let server_track = Arc::new(TestServerAudioTrack {
379            sid: sid.clone(),
380            publisher_id: identity.clone(),
381            muted: AtomicBool::new(false),
382        });
383
384        room.audio_tracks.push(server_track.clone());
385
386        for (room_identity, client_room) in &room.client_rooms {
387            if *room_identity != identity {
388                let track = RemoteTrack::Audio(RemoteAudioTrack {
389                    server_track: server_track.clone(),
390                    room: client_room.downgrade(),
391                });
392                let publication = RemoteTrackPublication {
393                    sid: sid.clone(),
394                    room: client_room.downgrade(),
395                    track: track.clone(),
396                };
397                let participant = RemoteParticipant {
398                    identity: identity.clone(),
399                    room: client_room.downgrade(),
400                };
401                client_room
402                    .0
403                    .lock()
404                    .updates_tx
405                    .blocking_send(RoomEvent::TrackSubscribed {
406                        track,
407                        publication,
408                        participant,
409                    })
410                    .ok();
411            }
412        }
413
414        Ok(sid)
415    }
416
417    async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
418        Ok(())
419    }
420
421    fn set_track_muted(&self, token: &str, track_sid: &TrackSid, muted: bool) -> Result<()> {
422        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
423        let room_name = claims.video.room.unwrap();
424        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
425        let mut server_rooms = self.rooms.lock();
426        let room = server_rooms
427            .get_mut(&*room_name)
428            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
429        if let Some(track) = room
430            .audio_tracks
431            .iter_mut()
432            .find(|track| track.sid == *track_sid)
433        {
434            track.muted.store(muted, SeqCst);
435            for (id, client_room) in room.client_rooms.iter() {
436                if *id != identity {
437                    let participant = Participant::Remote(RemoteParticipant {
438                        identity: identity.clone(),
439                        room: client_room.downgrade(),
440                    });
441                    let track = RemoteTrack::Audio(RemoteAudioTrack {
442                        server_track: track.clone(),
443                        room: client_room.downgrade(),
444                    });
445                    let publication = TrackPublication::Remote(RemoteTrackPublication {
446                        sid: track_sid.clone(),
447                        room: client_room.downgrade(),
448                        track,
449                    });
450
451                    let event = if muted {
452                        RoomEvent::TrackMuted {
453                            participant,
454                            publication,
455                        }
456                    } else {
457                        RoomEvent::TrackUnmuted {
458                            participant,
459                            publication,
460                        }
461                    };
462
463                    client_room
464                        .0
465                        .lock()
466                        .updates_tx
467                        .blocking_send(event)
468                        .unwrap();
469                }
470            }
471        }
472        Ok(())
473    }
474
475    fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
476        let claims = livekit_api::token::validate(&token, &self.secret_key).ok()?;
477        let room_name = claims.video.room.unwrap();
478
479        let mut server_rooms = self.rooms.lock();
480        let room = server_rooms.get_mut(&*room_name)?;
481        room.audio_tracks.iter().find_map(|track| {
482            if track.sid == *track_sid {
483                Some(track.muted.load(SeqCst))
484            } else {
485                None
486            }
487        })
488    }
489
490    fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
491        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
492        let room_name = claims.video.room.unwrap();
493        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
494
495        let mut server_rooms = self.rooms.lock();
496        let room = server_rooms
497            .get_mut(&*room_name)
498            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
499        let client_room = room
500            .client_rooms
501            .get(&identity)
502            .ok_or_else(|| anyhow!("not a participant in room"))?;
503        Ok(room
504            .video_tracks
505            .iter()
506            .map(|track| RemoteVideoTrack {
507                server_track: track.clone(),
508                _room: client_room.downgrade(),
509            })
510            .collect())
511    }
512
513    fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
514        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
515        let room_name = claims.video.room.unwrap();
516        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
517
518        let mut server_rooms = self.rooms.lock();
519        let room = server_rooms
520            .get_mut(&*room_name)
521            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
522        let client_room = room
523            .client_rooms
524            .get(&identity)
525            .ok_or_else(|| anyhow!("not a participant in room"))?;
526        Ok(room
527            .audio_tracks
528            .iter()
529            .map(|track| RemoteAudioTrack {
530                server_track: track.clone(),
531                room: client_room.downgrade(),
532            })
533            .collect())
534    }
535}
536
537#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
538#[derive(Default, Debug)]
539struct TestServerRoom {
540    client_rooms: HashMap<ParticipantIdentity, Room>,
541    video_tracks: Vec<Arc<TestServerVideoTrack>>,
542    audio_tracks: Vec<Arc<TestServerAudioTrack>>,
543    participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
544}
545
546#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
547#[derive(Debug)]
548struct TestServerVideoTrack {
549    sid: TrackSid,
550    publisher_id: ParticipantIdentity,
551    // frames_rx: async_broadcast::Receiver<Frame>,
552}
553
554#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
555#[derive(Debug)]
556struct TestServerAudioTrack {
557    sid: TrackSid,
558    publisher_id: ParticipantIdentity,
559    muted: AtomicBool,
560}
561
562pub struct TestApiClient {
563    url: String,
564}
565
566#[derive(Clone, Debug)]
567#[non_exhaustive]
568pub enum RoomEvent {
569    ParticipantConnected(RemoteParticipant),
570    ParticipantDisconnected(RemoteParticipant),
571    LocalTrackPublished {
572        publication: LocalTrackPublication,
573        track: LocalTrack,
574        participant: LocalParticipant,
575    },
576    LocalTrackUnpublished {
577        publication: LocalTrackPublication,
578        participant: LocalParticipant,
579    },
580    TrackSubscribed {
581        track: RemoteTrack,
582        publication: RemoteTrackPublication,
583        participant: RemoteParticipant,
584    },
585    TrackUnsubscribed {
586        track: RemoteTrack,
587        publication: RemoteTrackPublication,
588        participant: RemoteParticipant,
589    },
590    TrackSubscriptionFailed {
591        participant: RemoteParticipant,
592        error: String,
593        #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
594        track_sid: TrackSid,
595    },
596    TrackPublished {
597        publication: RemoteTrackPublication,
598        participant: RemoteParticipant,
599    },
600    TrackUnpublished {
601        publication: RemoteTrackPublication,
602        participant: RemoteParticipant,
603    },
604    TrackMuted {
605        participant: Participant,
606        publication: TrackPublication,
607    },
608    TrackUnmuted {
609        participant: Participant,
610        publication: TrackPublication,
611    },
612    RoomMetadataChanged {
613        old_metadata: String,
614        metadata: String,
615    },
616    ParticipantMetadataChanged {
617        participant: Participant,
618        old_metadata: String,
619        metadata: String,
620    },
621    ParticipantNameChanged {
622        participant: Participant,
623        old_name: String,
624        name: String,
625    },
626    ActiveSpeakersChanged {
627        speakers: Vec<Participant>,
628    },
629    #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
630    ConnectionStateChanged(ConnectionState),
631    Connected {
632        participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
633    },
634    #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
635    Disconnected {
636        reason: DisconnectReason,
637    },
638    Reconnecting,
639    Reconnected,
640}
641
642#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
643#[async_trait]
644impl livekit_api::Client for TestApiClient {
645    fn url(&self) -> &str {
646        &self.url
647    }
648
649    async fn create_room(&self, name: String) -> Result<()> {
650        let server = TestServer::get(&self.url)?;
651        server.create_room(name).await?;
652        Ok(())
653    }
654
655    async fn delete_room(&self, name: String) -> Result<()> {
656        let server = TestServer::get(&self.url)?;
657        server.delete_room(name).await?;
658        Ok(())
659    }
660
661    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
662        let server = TestServer::get(&self.url)?;
663        server
664            .remove_participant(room, ParticipantIdentity(identity))
665            .await?;
666        Ok(())
667    }
668
669    async fn update_participant(
670        &self,
671        room: String,
672        identity: String,
673        permission: livekit_api::proto::ParticipantPermission,
674    ) -> Result<()> {
675        let server = TestServer::get(&self.url)?;
676        server
677            .update_participant(room, identity, permission)
678            .await?;
679        Ok(())
680    }
681
682    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
683        let server = TestServer::get(&self.url)?;
684        token::create(
685            &server.api_key,
686            &server.secret_key,
687            Some(identity),
688            token::VideoGrant::to_join(room),
689        )
690    }
691
692    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
693        let server = TestServer::get(&self.url)?;
694        token::create(
695            &server.api_key,
696            &server.secret_key,
697            Some(identity),
698            token::VideoGrant::for_guest(room),
699        )
700    }
701}
702
703struct RoomState {
704    url: String,
705    token: String,
706    #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
707    local_identity: ParticipantIdentity,
708    #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
709    connection_state: ConnectionState,
710    #[cfg(not(all(target_os = "windows", target_env = "gnu")))]
711    paused_audio_tracks: HashSet<TrackSid>,
712    updates_tx: mpsc::Sender<RoomEvent>,
713}
714
715#[derive(Clone, Debug)]
716pub struct Room(Arc<Mutex<RoomState>>);
717
718#[derive(Clone, Debug)]
719pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
720
721#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
722impl std::fmt::Debug for RoomState {
723    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
724        f.debug_struct("Room")
725            .field("url", &self.url)
726            .field("token", &self.token)
727            .field("local_identity", &self.local_identity)
728            .field("connection_state", &self.connection_state)
729            .field("paused_audio_tracks", &self.paused_audio_tracks)
730            .finish()
731    }
732}
733
734#[cfg(all(target_os = "windows", target_env = "gnu"))]
735impl std::fmt::Debug for RoomState {
736    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
737        f.debug_struct("Room")
738            .field("url", &self.url)
739            .field("token", &self.token)
740            .finish()
741    }
742}
743
744#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
745impl Room {
746    fn downgrade(&self) -> WeakRoom {
747        WeakRoom(Arc::downgrade(&self.0))
748    }
749
750    pub fn connection_state(&self) -> ConnectionState {
751        self.0.lock().connection_state
752    }
753
754    pub fn local_participant(&self) -> LocalParticipant {
755        let identity = self.0.lock().local_identity.clone();
756        LocalParticipant {
757            identity,
758            room: self.clone(),
759        }
760    }
761
762    pub async fn connect(
763        url: &str,
764        token: &str,
765        _options: RoomOptions,
766    ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
767        let server = TestServer::get(&url)?;
768        let (updates_tx, updates_rx) = mpsc::channel(1024);
769        let this = Self(Arc::new(Mutex::new(RoomState {
770            local_identity: ParticipantIdentity(String::new()),
771            url: url.to_string(),
772            token: token.to_string(),
773            connection_state: ConnectionState::Disconnected,
774            paused_audio_tracks: Default::default(),
775            updates_tx,
776        })));
777
778        let identity = server
779            .join_room(token.to_string(), this.clone())
780            .await
781            .context("room join")?;
782        {
783            let mut state = this.0.lock();
784            state.local_identity = identity;
785            state.connection_state = ConnectionState::Connected;
786        }
787
788        Ok((this, updates_rx))
789    }
790
791    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
792        self.test_server()
793            .remote_participants(self.0.lock().token.clone())
794            .unwrap()
795    }
796
797    fn test_server(&self) -> Arc<TestServer> {
798        TestServer::get(&self.0.lock().url).unwrap()
799    }
800
801    fn token(&self) -> String {
802        self.0.lock().token.clone()
803    }
804}
805
806#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
807impl Drop for RoomState {
808    fn drop(&mut self) {
809        if self.connection_state == ConnectionState::Connected {
810            if let Ok(server) = TestServer::get(&self.url) {
811                let executor = server.executor.clone();
812                let token = self.token.clone();
813                executor
814                    .spawn(async move { server.leave_room(token).await.ok() })
815                    .detach();
816            }
817        }
818    }
819}
820
821impl WeakRoom {
822    fn upgrade(&self) -> Option<Room> {
823        self.0.upgrade().map(Room)
824    }
825}