test.rs

  1use crate::{AudioStream, Participant, RemoteTrack, RoomEvent, TrackPublication};
  2
  3use crate::mock_client::{participant::*, publication::*, track::*};
  4use anyhow::{Context as _, Result};
  5use async_trait::async_trait;
  6use collections::{BTreeMap, HashMap, HashSet, btree_map::Entry as BTreeEntry, hash_map::Entry};
  7use gpui::{App, AsyncApp, BackgroundExecutor};
  8use livekit_api::{proto, token};
  9use parking_lot::Mutex;
 10use postage::{mpsc, sink::Sink};
 11use std::sync::{
 12    Arc, Weak,
 13    atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
 14};
 15
 16#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 17pub struct ParticipantIdentity(pub String);
 18
 19#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 20pub struct TrackSid(pub(crate) String);
 21
 22impl std::fmt::Display for TrackSid {
 23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 24        self.0.fmt(f)
 25    }
 26}
 27
 28impl TryFrom<String> for TrackSid {
 29    type Error = anyhow::Error;
 30
 31    fn try_from(value: String) -> Result<Self, Self::Error> {
 32        Ok(TrackSid(value))
 33    }
 34}
 35
 36#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 37#[non_exhaustive]
 38pub enum ConnectionState {
 39    Connected,
 40    Disconnected,
 41}
 42
 43#[derive(Clone, Debug, Default)]
 44pub struct SessionStats {
 45    pub publisher_stats: Vec<RtcStats>,
 46    pub subscriber_stats: Vec<RtcStats>,
 47}
 48
 49#[derive(Clone, Debug)]
 50pub enum RtcStats {}
 51
 52static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 53
 54pub struct TestServer {
 55    pub url: String,
 56    pub api_key: String,
 57    pub secret_key: String,
 58    rooms: Mutex<HashMap<String, TestServerRoom>>,
 59    executor: BackgroundExecutor,
 60}
 61
 62impl TestServer {
 63    pub fn create(
 64        url: String,
 65        api_key: String,
 66        secret_key: String,
 67        executor: BackgroundExecutor,
 68    ) -> Result<Arc<TestServer>> {
 69        let mut servers = SERVERS.lock();
 70        if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
 71            let server = Arc::new(TestServer {
 72                url,
 73                api_key,
 74                secret_key,
 75                rooms: Default::default(),
 76                executor,
 77            });
 78            e.insert(server.clone());
 79            Ok(server)
 80        } else {
 81            anyhow::bail!("a server with url {url:?} already exists");
 82        }
 83    }
 84
 85    fn get(url: &str) -> Result<Arc<TestServer>> {
 86        Ok(SERVERS
 87            .lock()
 88            .get(url)
 89            .context("no server found for url")?
 90            .clone())
 91    }
 92
 93    pub fn teardown(&self) -> Result<()> {
 94        SERVERS
 95            .lock()
 96            .remove(&self.url)
 97            .with_context(|| format!("server with url {:?} does not exist", self.url))?;
 98        Ok(())
 99    }
100
101    pub fn create_api_client(&self) -> TestApiClient {
102        TestApiClient {
103            url: self.url.clone(),
104        }
105    }
106
107    pub async fn create_room(&self, room: String) -> Result<()> {
108        self.simulate_random_delay().await;
109
110        let mut server_rooms = self.rooms.lock();
111        if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
112            e.insert(Default::default());
113            Ok(())
114        } else {
115            anyhow::bail!("{room:?} already exists");
116        }
117    }
118
119    async fn delete_room(&self, room: String) -> Result<()> {
120        self.simulate_random_delay().await;
121
122        let mut server_rooms = self.rooms.lock();
123        server_rooms
124            .remove(&room)
125            .with_context(|| format!("room {room:?} does not exist"))?;
126        Ok(())
127    }
128
129    async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
130        self.simulate_random_delay().await;
131
132        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
133        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
134        let room_name = claims.video.room.unwrap();
135        let mut server_rooms = self.rooms.lock();
136        let room = (*server_rooms).entry(room_name.to_string()).or_default();
137
138        if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
139            for server_track in &room.video_tracks {
140                let track = RemoteTrack::Video(RemoteVideoTrack {
141                    server_track: server_track.clone(),
142                    _room: client_room.downgrade(),
143                });
144                client_room
145                    .0
146                    .lock()
147                    .updates_tx
148                    .blocking_send(RoomEvent::TrackSubscribed {
149                        track: track.clone(),
150                        publication: RemoteTrackPublication {
151                            sid: server_track.sid.clone(),
152                            room: client_room.downgrade(),
153                            track,
154                        },
155                        participant: RemoteParticipant {
156                            room: client_room.downgrade(),
157                            identity: server_track.publisher_id.clone(),
158                        },
159                    })
160                    .unwrap();
161            }
162            for server_track in &room.audio_tracks {
163                let track = RemoteTrack::Audio(RemoteAudioTrack {
164                    server_track: server_track.clone(),
165                    room: client_room.downgrade(),
166                });
167                client_room
168                    .0
169                    .lock()
170                    .updates_tx
171                    .blocking_send(RoomEvent::TrackSubscribed {
172                        track: track.clone(),
173                        publication: RemoteTrackPublication {
174                            sid: server_track.sid.clone(),
175                            room: client_room.downgrade(),
176                            track,
177                        },
178                        participant: RemoteParticipant {
179                            room: client_room.downgrade(),
180                            identity: server_track.publisher_id.clone(),
181                        },
182                    })
183                    .unwrap();
184            }
185            e.insert(client_room);
186            Ok(identity)
187        } else {
188            anyhow::bail!("{identity:?} attempted to join room {room_name:?} twice");
189        }
190    }
191
192    async fn leave_room(&self, token: String) -> Result<()> {
193        self.simulate_random_delay().await;
194
195        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
196        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
197        let room_name = claims.video.room.unwrap();
198        let mut server_rooms = self.rooms.lock();
199        let room = server_rooms
200            .get_mut(&*room_name)
201            .with_context(|| format!("room {room_name:?} does not exist"))?;
202        room.client_rooms.remove(&identity).with_context(|| {
203            format!("{identity:?} attempted to leave room {room_name:?} before joining it")
204        })?;
205        Ok(())
206    }
207
208    fn remote_participants(
209        &self,
210        token: String,
211    ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
212        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
213        let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
214        let room_name = claims.video.room.unwrap().to_string();
215
216        if let Some(server_room) = self.rooms.lock().get(&room_name) {
217            let room = server_room
218                .client_rooms
219                .get(&local_identity)
220                .unwrap()
221                .downgrade();
222            Ok(server_room
223                .client_rooms
224                .iter()
225                .filter(|(identity, _)| *identity != &local_identity)
226                .map(|(identity, _)| {
227                    (
228                        identity.clone(),
229                        RemoteParticipant {
230                            room: room.clone(),
231                            identity: identity.clone(),
232                        },
233                    )
234                })
235                .collect())
236        } else {
237            Ok(Default::default())
238        }
239    }
240
241    async fn remove_participant(
242        &self,
243        room_name: String,
244        identity: ParticipantIdentity,
245    ) -> Result<()> {
246        self.simulate_random_delay().await;
247
248        let mut server_rooms = self.rooms.lock();
249        let room = server_rooms
250            .get_mut(&room_name)
251            .with_context(|| format!("room {room_name} does not exist"))?;
252        room.client_rooms
253            .remove(&identity)
254            .with_context(|| format!("participant {identity:?} did not join room {room_name:?}"))?;
255        Ok(())
256    }
257
258    async fn update_participant(
259        &self,
260        room_name: String,
261        identity: String,
262        permission: proto::ParticipantPermission,
263    ) -> Result<()> {
264        self.simulate_random_delay().await;
265
266        let mut server_rooms = self.rooms.lock();
267        let room = server_rooms
268            .get_mut(&room_name)
269            .with_context(|| format!("room {room_name} does not exist"))?;
270        room.participant_permissions
271            .insert(ParticipantIdentity(identity), permission);
272        Ok(())
273    }
274
275    pub async fn disconnect_client(&self, client_identity: String) {
276        let client_identity = ParticipantIdentity(client_identity);
277
278        self.simulate_random_delay().await;
279
280        let mut server_rooms = self.rooms.lock();
281        for room in server_rooms.values_mut() {
282            if let Some(room) = room.client_rooms.remove(&client_identity) {
283                let mut room = room.0.lock();
284                room.connection_state = ConnectionState::Disconnected;
285                room.updates_tx
286                    .blocking_send(RoomEvent::Disconnected {
287                        reason: "SIGNAL_CLOSED",
288                    })
289                    .ok();
290            }
291        }
292    }
293
294    pub(crate) async fn publish_video_track(
295        &self,
296        token: String,
297        _local_track: LocalVideoTrack,
298    ) -> Result<TrackSid> {
299        self.simulate_random_delay().await;
300
301        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
302        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
303        let room_name = claims.video.room.unwrap();
304
305        let mut server_rooms = self.rooms.lock();
306        let room = server_rooms
307            .get_mut(&*room_name)
308            .with_context(|| format!("room {room_name} does not exist"))?;
309
310        let can_publish = room
311            .participant_permissions
312            .get(&identity)
313            .map(|permission| permission.can_publish)
314            .or(claims.video.can_publish)
315            .unwrap_or(true);
316
317        anyhow::ensure!(can_publish, "user is not allowed to publish");
318
319        let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
320        let server_track = Arc::new(TestServerVideoTrack {
321            sid: sid.clone(),
322            publisher_id: identity.clone(),
323        });
324
325        room.video_tracks.push(server_track.clone());
326
327        for (room_identity, client_room) in &room.client_rooms {
328            if *room_identity != identity {
329                let track = RemoteTrack::Video(RemoteVideoTrack {
330                    server_track: server_track.clone(),
331                    _room: client_room.downgrade(),
332                });
333                let publication = RemoteTrackPublication {
334                    sid: sid.clone(),
335                    room: client_room.downgrade(),
336                    track: track.clone(),
337                };
338                let participant = RemoteParticipant {
339                    identity: identity.clone(),
340                    room: client_room.downgrade(),
341                };
342                client_room
343                    .0
344                    .lock()
345                    .updates_tx
346                    .blocking_send(RoomEvent::TrackSubscribed {
347                        track,
348                        publication,
349                        participant,
350                    })
351                    .unwrap();
352            }
353        }
354
355        Ok(sid)
356    }
357
358    pub(crate) async fn publish_audio_track(
359        &self,
360        token: String,
361        _local_track: &LocalAudioTrack,
362    ) -> Result<TrackSid> {
363        self.simulate_random_delay().await;
364
365        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
366        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
367        let room_name = claims.video.room.unwrap();
368
369        let mut server_rooms = self.rooms.lock();
370        let room = server_rooms
371            .get_mut(&*room_name)
372            .with_context(|| format!("room {room_name} does not exist"))?;
373
374        let can_publish = room
375            .participant_permissions
376            .get(&identity)
377            .map(|permission| permission.can_publish)
378            .or(claims.video.can_publish)
379            .unwrap_or(true);
380
381        anyhow::ensure!(can_publish, "user is not allowed to publish");
382
383        let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
384        let server_track = Arc::new(TestServerAudioTrack {
385            sid: sid.clone(),
386            publisher_id: identity.clone(),
387            muted: AtomicBool::new(false),
388        });
389
390        room.audio_tracks.push(server_track.clone());
391
392        for (room_identity, client_room) in &room.client_rooms {
393            if *room_identity != identity {
394                let track = RemoteTrack::Audio(RemoteAudioTrack {
395                    server_track: server_track.clone(),
396                    room: client_room.downgrade(),
397                });
398                let publication = RemoteTrackPublication {
399                    sid: sid.clone(),
400                    room: client_room.downgrade(),
401                    track: track.clone(),
402                };
403                let participant = RemoteParticipant {
404                    identity: identity.clone(),
405                    room: client_room.downgrade(),
406                };
407                client_room
408                    .0
409                    .lock()
410                    .updates_tx
411                    .blocking_send(RoomEvent::TrackSubscribed {
412                        track,
413                        publication,
414                        participant,
415                    })
416                    .ok();
417            }
418        }
419
420        Ok(sid)
421    }
422
423    pub(crate) async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
424        Ok(())
425    }
426
427    pub(crate) fn set_track_muted(
428        &self,
429        token: &str,
430        track_sid: &TrackSid,
431        muted: bool,
432    ) -> Result<()> {
433        let claims = livekit_api::token::validate(token, &self.secret_key)?;
434        let room_name = claims.video.room.unwrap();
435        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
436        let mut server_rooms = self.rooms.lock();
437        let room = server_rooms
438            .get_mut(&*room_name)
439            .with_context(|| format!("room {room_name} does not exist"))?;
440        if let Some(track) = room
441            .audio_tracks
442            .iter_mut()
443            .find(|track| track.sid == *track_sid)
444        {
445            track.muted.store(muted, SeqCst);
446            for (id, client_room) in room.client_rooms.iter() {
447                if *id != identity {
448                    let participant = Participant::Remote(RemoteParticipant {
449                        identity: identity.clone(),
450                        room: client_room.downgrade(),
451                    });
452                    let track = RemoteTrack::Audio(RemoteAudioTrack {
453                        server_track: track.clone(),
454                        room: client_room.downgrade(),
455                    });
456                    let publication = TrackPublication::Remote(RemoteTrackPublication {
457                        sid: track_sid.clone(),
458                        room: client_room.downgrade(),
459                        track,
460                    });
461
462                    let event = if muted {
463                        RoomEvent::TrackMuted {
464                            participant,
465                            publication,
466                        }
467                    } else {
468                        RoomEvent::TrackUnmuted {
469                            participant,
470                            publication,
471                        }
472                    };
473
474                    client_room
475                        .0
476                        .lock()
477                        .updates_tx
478                        .blocking_send(event)
479                        .unwrap();
480                }
481            }
482        }
483        Ok(())
484    }
485
486    pub(crate) fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
487        let claims = livekit_api::token::validate(token, &self.secret_key).ok()?;
488        let room_name = claims.video.room.unwrap();
489
490        let mut server_rooms = self.rooms.lock();
491        let room = server_rooms.get_mut(&*room_name)?;
492        room.audio_tracks.iter().find_map(|track| {
493            if track.sid == *track_sid {
494                Some(track.muted.load(SeqCst))
495            } else {
496                None
497            }
498        })
499    }
500
501    pub(crate) fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
502        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
503        let room_name = claims.video.room.unwrap();
504        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
505
506        let mut server_rooms = self.rooms.lock();
507        let room = server_rooms
508            .get_mut(&*room_name)
509            .with_context(|| format!("room {room_name} does not exist"))?;
510        let client_room = room
511            .client_rooms
512            .get(&identity)
513            .context("not a participant in room")?;
514        Ok(room
515            .video_tracks
516            .iter()
517            .map(|track| RemoteVideoTrack {
518                server_track: track.clone(),
519                _room: client_room.downgrade(),
520            })
521            .collect())
522    }
523
524    pub(crate) fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
525        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
526        let room_name = claims.video.room.unwrap();
527        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
528
529        let mut server_rooms = self.rooms.lock();
530        let room = server_rooms
531            .get_mut(&*room_name)
532            .with_context(|| format!("room {room_name} does not exist"))?;
533        let client_room = room
534            .client_rooms
535            .get(&identity)
536            .context("not a participant in room")?;
537        Ok(room
538            .audio_tracks
539            .iter()
540            .map(|track| RemoteAudioTrack {
541                server_track: track.clone(),
542                room: client_room.downgrade(),
543            })
544            .collect())
545    }
546
547    async fn simulate_random_delay(&self) {
548        #[cfg(any(test, feature = "test-support"))]
549        self.executor.simulate_random_delay().await;
550    }
551}
552
553#[derive(Default, Debug)]
554struct TestServerRoom {
555    client_rooms: HashMap<ParticipantIdentity, Room>,
556    video_tracks: Vec<Arc<TestServerVideoTrack>>,
557    audio_tracks: Vec<Arc<TestServerAudioTrack>>,
558    participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
559}
560
561#[derive(Debug)]
562pub(crate) struct TestServerVideoTrack {
563    pub(crate) sid: TrackSid,
564    pub(crate) publisher_id: ParticipantIdentity,
565    // frames_rx: async_broadcast::Receiver<Frame>,
566}
567
568#[derive(Debug)]
569pub(crate) struct TestServerAudioTrack {
570    pub(crate) sid: TrackSid,
571    pub(crate) publisher_id: ParticipantIdentity,
572    pub(crate) muted: AtomicBool,
573}
574
575pub struct TestApiClient {
576    url: String,
577}
578
579#[async_trait]
580impl livekit_api::Client for TestApiClient {
581    fn url(&self) -> &str {
582        &self.url
583    }
584
585    async fn create_room(&self, name: String) -> Result<()> {
586        let server = TestServer::get(&self.url)?;
587        server.create_room(name).await?;
588        Ok(())
589    }
590
591    async fn delete_room(&self, name: String) -> Result<()> {
592        let server = TestServer::get(&self.url)?;
593        server.delete_room(name).await?;
594        Ok(())
595    }
596
597    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
598        let server = TestServer::get(&self.url)?;
599        server
600            .remove_participant(room, ParticipantIdentity(identity))
601            .await?;
602        Ok(())
603    }
604
605    async fn update_participant(
606        &self,
607        room: String,
608        identity: String,
609        permission: livekit_api::proto::ParticipantPermission,
610    ) -> Result<()> {
611        let server = TestServer::get(&self.url)?;
612        server
613            .update_participant(room, identity, permission)
614            .await?;
615        Ok(())
616    }
617
618    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
619        let server = TestServer::get(&self.url)?;
620        token::create(
621            &server.api_key,
622            &server.secret_key,
623            Some(identity),
624            token::VideoGrant::to_join(room),
625        )
626    }
627
628    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
629        let server = TestServer::get(&self.url)?;
630        token::create(
631            &server.api_key,
632            &server.secret_key,
633            Some(identity),
634            token::VideoGrant::for_guest(room),
635        )
636    }
637}
638
639pub(crate) struct RoomState {
640    pub(crate) url: String,
641    pub(crate) token: String,
642    pub(crate) local_identity: ParticipantIdentity,
643    pub(crate) connection_state: ConnectionState,
644    pub(crate) paused_audio_tracks: HashSet<TrackSid>,
645    pub(crate) updates_tx: mpsc::Sender<RoomEvent>,
646}
647
648#[derive(Clone, Debug)]
649pub struct Room(pub(crate) Arc<Mutex<RoomState>>);
650
651#[derive(Clone, Debug)]
652pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
653
654impl std::fmt::Debug for RoomState {
655    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
656        f.debug_struct("Room")
657            .field("url", &self.url)
658            .field("token", &self.token)
659            .field("local_identity", &self.local_identity)
660            .field("connection_state", &self.connection_state)
661            .field("paused_audio_tracks", &self.paused_audio_tracks)
662            .finish()
663    }
664}
665
666impl Room {
667    pub(crate) fn downgrade(&self) -> WeakRoom {
668        WeakRoom(Arc::downgrade(&self.0))
669    }
670
671    pub fn connection_state(&self) -> ConnectionState {
672        self.0.lock().connection_state
673    }
674
675    pub fn local_participant(&self) -> LocalParticipant {
676        let identity = self.0.lock().local_identity.clone();
677        LocalParticipant {
678            identity,
679            room: self.clone(),
680        }
681    }
682
683    pub async fn connect(
684        url: String,
685        token: String,
686        _cx: &mut AsyncApp,
687    ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
688        let server = TestServer::get(&url)?;
689        let (updates_tx, updates_rx) = mpsc::channel(1024);
690        let this = Self(Arc::new(Mutex::new(RoomState {
691            local_identity: ParticipantIdentity(String::new()),
692            url: url.to_string(),
693            token: token.to_string(),
694            connection_state: ConnectionState::Disconnected,
695            paused_audio_tracks: Default::default(),
696            updates_tx,
697        })));
698
699        let identity = server
700            .join_room(token.to_string(), this.clone())
701            .await
702            .context("room join")?;
703        {
704            let mut state = this.0.lock();
705            state.local_identity = identity;
706            state.connection_state = ConnectionState::Connected;
707        }
708
709        Ok((this, updates_rx))
710    }
711
712    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
713        self.test_server()
714            .remote_participants(self.0.lock().token.clone())
715            .unwrap()
716    }
717
718    pub(crate) fn test_server(&self) -> Arc<TestServer> {
719        TestServer::get(&self.0.lock().url).unwrap()
720    }
721
722    pub(crate) fn token(&self) -> String {
723        self.0.lock().token.clone()
724    }
725
726    pub fn name(&self) -> String {
727        "test_room".to_string()
728    }
729
730    pub async fn sid(&self) -> String {
731        "RM_test_session".to_string()
732    }
733
734    pub fn play_remote_audio_track(
735        &self,
736        _track: &RemoteAudioTrack,
737        _cx: &App,
738    ) -> anyhow::Result<AudioStream> {
739        Ok(AudioStream {})
740    }
741
742    pub async fn unpublish_local_track(&self, sid: TrackSid, cx: &mut AsyncApp) -> Result<()> {
743        self.local_participant().unpublish_track(sid, cx).await
744    }
745
746    pub async fn publish_local_microphone_track(
747        &self,
748        _track_name: String,
749        _is_staff: bool,
750        cx: &mut AsyncApp,
751    ) -> Result<(LocalTrackPublication, AudioStream, Arc<AtomicU64>)> {
752        self.local_participant().publish_microphone_track(cx).await
753    }
754
755    pub async fn get_stats(&self) -> Result<SessionStats> {
756        Ok(SessionStats::default())
757    }
758
759    pub fn stats_task(&self, _cx: &impl gpui::AppContext) -> gpui::Task<Result<SessionStats>> {
760        gpui::Task::ready(Ok(SessionStats::default()))
761    }
762}
763
764impl Drop for RoomState {
765    fn drop(&mut self) {
766        if self.connection_state == ConnectionState::Connected
767            && let Ok(server) = TestServer::get(&self.url)
768        {
769            let executor = server.executor.clone();
770            let token = self.token.clone();
771            executor
772                .spawn(async move { server.leave_room(token).await.ok() })
773                .detach();
774        }
775    }
776}
777
778impl WeakRoom {
779    pub(crate) fn upgrade(&self) -> Option<Room> {
780        self.0.upgrade().map(Room)
781    }
782}