test.rs

  1use crate::{AudioStream, Participant, RemoteTrack, RoomEvent, TrackPublication};
  2
  3use crate::mock_client::{participant::*, publication::*, track::*};
  4use anyhow::{Context as _, Result};
  5use async_trait::async_trait;
  6use collections::{BTreeMap, HashMap, HashSet, btree_map::Entry as BTreeEntry, hash_map::Entry};
  7use gpui::{App, AsyncApp, BackgroundExecutor};
  8use livekit_api::{proto, token};
  9use parking_lot::Mutex;
 10use postage::{mpsc, sink::Sink};
 11use std::sync::{
 12    Arc, Weak,
 13    atomic::{AtomicBool, Ordering::SeqCst},
 14};
 15
 16#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 17pub struct ParticipantIdentity(pub String);
 18
 19#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 20pub struct TrackSid(pub(crate) String);
 21
 22impl std::fmt::Display for TrackSid {
 23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 24        self.0.fmt(f)
 25    }
 26}
 27
 28impl TryFrom<String> for TrackSid {
 29    type Error = anyhow::Error;
 30
 31    fn try_from(value: String) -> Result<Self, Self::Error> {
 32        Ok(TrackSid(value))
 33    }
 34}
 35
 36#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 37#[non_exhaustive]
 38pub enum ConnectionState {
 39    Connected,
 40    Disconnected,
 41}
 42
 43static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 44
 45pub struct TestServer {
 46    pub url: String,
 47    pub api_key: String,
 48    pub secret_key: String,
 49    rooms: Mutex<HashMap<String, TestServerRoom>>,
 50    executor: BackgroundExecutor,
 51}
 52
 53impl TestServer {
 54    pub fn create(
 55        url: String,
 56        api_key: String,
 57        secret_key: String,
 58        executor: BackgroundExecutor,
 59    ) -> Result<Arc<TestServer>> {
 60        let mut servers = SERVERS.lock();
 61        if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
 62            let server = Arc::new(TestServer {
 63                url,
 64                api_key,
 65                secret_key,
 66                rooms: Default::default(),
 67                executor,
 68            });
 69            e.insert(server.clone());
 70            Ok(server)
 71        } else {
 72            anyhow::bail!("a server with url {url:?} already exists");
 73        }
 74    }
 75
 76    fn get(url: &str) -> Result<Arc<TestServer>> {
 77        Ok(SERVERS
 78            .lock()
 79            .get(url)
 80            .context("no server found for url")?
 81            .clone())
 82    }
 83
 84    pub fn teardown(&self) -> Result<()> {
 85        SERVERS
 86            .lock()
 87            .remove(&self.url)
 88            .with_context(|| format!("server with url {:?} does not exist", self.url))?;
 89        Ok(())
 90    }
 91
 92    pub fn create_api_client(&self) -> TestApiClient {
 93        TestApiClient {
 94            url: self.url.clone(),
 95        }
 96    }
 97
 98    pub async fn create_room(&self, room: String) -> Result<()> {
 99        self.simulate_random_delay().await;
100
101        let mut server_rooms = self.rooms.lock();
102        if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
103            e.insert(Default::default());
104            Ok(())
105        } else {
106            anyhow::bail!("{room:?} already exists");
107        }
108    }
109
110    async fn delete_room(&self, room: String) -> Result<()> {
111        self.simulate_random_delay().await;
112
113        let mut server_rooms = self.rooms.lock();
114        server_rooms
115            .remove(&room)
116            .with_context(|| format!("room {room:?} does not exist"))?;
117        Ok(())
118    }
119
120    async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
121        self.simulate_random_delay().await;
122
123        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
124        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
125        let room_name = claims.video.room.unwrap();
126        let mut server_rooms = self.rooms.lock();
127        let room = (*server_rooms).entry(room_name.to_string()).or_default();
128
129        if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
130            for server_track in &room.video_tracks {
131                let track = RemoteTrack::Video(RemoteVideoTrack {
132                    server_track: server_track.clone(),
133                    _room: client_room.downgrade(),
134                });
135                client_room
136                    .0
137                    .lock()
138                    .updates_tx
139                    .blocking_send(RoomEvent::TrackSubscribed {
140                        track: track.clone(),
141                        publication: RemoteTrackPublication {
142                            sid: server_track.sid.clone(),
143                            room: client_room.downgrade(),
144                            track,
145                        },
146                        participant: RemoteParticipant {
147                            room: client_room.downgrade(),
148                            identity: server_track.publisher_id.clone(),
149                        },
150                    })
151                    .unwrap();
152            }
153            for server_track in &room.audio_tracks {
154                let track = RemoteTrack::Audio(RemoteAudioTrack {
155                    server_track: server_track.clone(),
156                    room: client_room.downgrade(),
157                });
158                client_room
159                    .0
160                    .lock()
161                    .updates_tx
162                    .blocking_send(RoomEvent::TrackSubscribed {
163                        track: track.clone(),
164                        publication: RemoteTrackPublication {
165                            sid: server_track.sid.clone(),
166                            room: client_room.downgrade(),
167                            track,
168                        },
169                        participant: RemoteParticipant {
170                            room: client_room.downgrade(),
171                            identity: server_track.publisher_id.clone(),
172                        },
173                    })
174                    .unwrap();
175            }
176            e.insert(client_room);
177            Ok(identity)
178        } else {
179            anyhow::bail!("{identity:?} attempted to join room {room_name:?} twice");
180        }
181    }
182
183    async fn leave_room(&self, token: String) -> Result<()> {
184        self.simulate_random_delay().await;
185
186        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
187        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
188        let room_name = claims.video.room.unwrap();
189        let mut server_rooms = self.rooms.lock();
190        let room = server_rooms
191            .get_mut(&*room_name)
192            .with_context(|| format!("room {room_name:?} does not exist"))?;
193        room.client_rooms.remove(&identity).with_context(|| {
194            format!("{identity:?} attempted to leave room {room_name:?} before joining it")
195        })?;
196        Ok(())
197    }
198
199    fn remote_participants(
200        &self,
201        token: String,
202    ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
203        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
204        let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
205        let room_name = claims.video.room.unwrap().to_string();
206
207        if let Some(server_room) = self.rooms.lock().get(&room_name) {
208            let room = server_room
209                .client_rooms
210                .get(&local_identity)
211                .unwrap()
212                .downgrade();
213            Ok(server_room
214                .client_rooms
215                .iter()
216                .filter(|(identity, _)| *identity != &local_identity)
217                .map(|(identity, _)| {
218                    (
219                        identity.clone(),
220                        RemoteParticipant {
221                            room: room.clone(),
222                            identity: identity.clone(),
223                        },
224                    )
225                })
226                .collect())
227        } else {
228            Ok(Default::default())
229        }
230    }
231
232    async fn remove_participant(
233        &self,
234        room_name: String,
235        identity: ParticipantIdentity,
236    ) -> Result<()> {
237        self.simulate_random_delay().await;
238
239        let mut server_rooms = self.rooms.lock();
240        let room = server_rooms
241            .get_mut(&room_name)
242            .with_context(|| format!("room {room_name} does not exist"))?;
243        room.client_rooms
244            .remove(&identity)
245            .with_context(|| format!("participant {identity:?} did not join room {room_name:?}"))?;
246        Ok(())
247    }
248
249    async fn update_participant(
250        &self,
251        room_name: String,
252        identity: String,
253        permission: proto::ParticipantPermission,
254    ) -> Result<()> {
255        self.simulate_random_delay().await;
256
257        let mut server_rooms = self.rooms.lock();
258        let room = server_rooms
259            .get_mut(&room_name)
260            .with_context(|| format!("room {room_name} does not exist"))?;
261        room.participant_permissions
262            .insert(ParticipantIdentity(identity), permission);
263        Ok(())
264    }
265
266    pub async fn disconnect_client(&self, client_identity: String) {
267        let client_identity = ParticipantIdentity(client_identity);
268
269        self.simulate_random_delay().await;
270
271        let mut server_rooms = self.rooms.lock();
272        for room in server_rooms.values_mut() {
273            if let Some(room) = room.client_rooms.remove(&client_identity) {
274                let mut room = room.0.lock();
275                room.connection_state = ConnectionState::Disconnected;
276                room.updates_tx
277                    .blocking_send(RoomEvent::Disconnected {
278                        reason: "SIGNAL_CLOSED",
279                    })
280                    .ok();
281            }
282        }
283    }
284
285    pub(crate) async fn publish_video_track(
286        &self,
287        token: String,
288        _local_track: LocalVideoTrack,
289    ) -> Result<TrackSid> {
290        self.simulate_random_delay().await;
291
292        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
293        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
294        let room_name = claims.video.room.unwrap();
295
296        let mut server_rooms = self.rooms.lock();
297        let room = server_rooms
298            .get_mut(&*room_name)
299            .with_context(|| format!("room {room_name} does not exist"))?;
300
301        let can_publish = room
302            .participant_permissions
303            .get(&identity)
304            .map(|permission| permission.can_publish)
305            .or(claims.video.can_publish)
306            .unwrap_or(true);
307
308        anyhow::ensure!(can_publish, "user is not allowed to publish");
309
310        let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
311        let server_track = Arc::new(TestServerVideoTrack {
312            sid: sid.clone(),
313            publisher_id: identity.clone(),
314        });
315
316        room.video_tracks.push(server_track.clone());
317
318        for (room_identity, client_room) in &room.client_rooms {
319            if *room_identity != identity {
320                let track = RemoteTrack::Video(RemoteVideoTrack {
321                    server_track: server_track.clone(),
322                    _room: client_room.downgrade(),
323                });
324                let publication = RemoteTrackPublication {
325                    sid: sid.clone(),
326                    room: client_room.downgrade(),
327                    track: track.clone(),
328                };
329                let participant = RemoteParticipant {
330                    identity: identity.clone(),
331                    room: client_room.downgrade(),
332                };
333                client_room
334                    .0
335                    .lock()
336                    .updates_tx
337                    .blocking_send(RoomEvent::TrackSubscribed {
338                        track,
339                        publication,
340                        participant,
341                    })
342                    .unwrap();
343            }
344        }
345
346        Ok(sid)
347    }
348
349    pub(crate) async fn publish_audio_track(
350        &self,
351        token: String,
352        _local_track: &LocalAudioTrack,
353    ) -> Result<TrackSid> {
354        self.simulate_random_delay().await;
355
356        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
357        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
358        let room_name = claims.video.room.unwrap();
359
360        let mut server_rooms = self.rooms.lock();
361        let room = server_rooms
362            .get_mut(&*room_name)
363            .with_context(|| format!("room {room_name} does not exist"))?;
364
365        let can_publish = room
366            .participant_permissions
367            .get(&identity)
368            .map(|permission| permission.can_publish)
369            .or(claims.video.can_publish)
370            .unwrap_or(true);
371
372        anyhow::ensure!(can_publish, "user is not allowed to publish");
373
374        let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
375        let server_track = Arc::new(TestServerAudioTrack {
376            sid: sid.clone(),
377            publisher_id: identity.clone(),
378            muted: AtomicBool::new(false),
379        });
380
381        room.audio_tracks.push(server_track.clone());
382
383        for (room_identity, client_room) in &room.client_rooms {
384            if *room_identity != identity {
385                let track = RemoteTrack::Audio(RemoteAudioTrack {
386                    server_track: server_track.clone(),
387                    room: client_room.downgrade(),
388                });
389                let publication = RemoteTrackPublication {
390                    sid: sid.clone(),
391                    room: client_room.downgrade(),
392                    track: track.clone(),
393                };
394                let participant = RemoteParticipant {
395                    identity: identity.clone(),
396                    room: client_room.downgrade(),
397                };
398                client_room
399                    .0
400                    .lock()
401                    .updates_tx
402                    .blocking_send(RoomEvent::TrackSubscribed {
403                        track,
404                        publication,
405                        participant,
406                    })
407                    .ok();
408            }
409        }
410
411        Ok(sid)
412    }
413
414    pub(crate) async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
415        Ok(())
416    }
417
418    pub(crate) fn set_track_muted(
419        &self,
420        token: &str,
421        track_sid: &TrackSid,
422        muted: bool,
423    ) -> Result<()> {
424        let claims = livekit_api::token::validate(token, &self.secret_key)?;
425        let room_name = claims.video.room.unwrap();
426        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
427        let mut server_rooms = self.rooms.lock();
428        let room = server_rooms
429            .get_mut(&*room_name)
430            .with_context(|| format!("room {room_name} does not exist"))?;
431        if let Some(track) = room
432            .audio_tracks
433            .iter_mut()
434            .find(|track| track.sid == *track_sid)
435        {
436            track.muted.store(muted, SeqCst);
437            for (id, client_room) in room.client_rooms.iter() {
438                if *id != identity {
439                    let participant = Participant::Remote(RemoteParticipant {
440                        identity: identity.clone(),
441                        room: client_room.downgrade(),
442                    });
443                    let track = RemoteTrack::Audio(RemoteAudioTrack {
444                        server_track: track.clone(),
445                        room: client_room.downgrade(),
446                    });
447                    let publication = TrackPublication::Remote(RemoteTrackPublication {
448                        sid: track_sid.clone(),
449                        room: client_room.downgrade(),
450                        track,
451                    });
452
453                    let event = if muted {
454                        RoomEvent::TrackMuted {
455                            participant,
456                            publication,
457                        }
458                    } else {
459                        RoomEvent::TrackUnmuted {
460                            participant,
461                            publication,
462                        }
463                    };
464
465                    client_room
466                        .0
467                        .lock()
468                        .updates_tx
469                        .blocking_send(event)
470                        .unwrap();
471                }
472            }
473        }
474        Ok(())
475    }
476
477    pub(crate) fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
478        let claims = livekit_api::token::validate(token, &self.secret_key).ok()?;
479        let room_name = claims.video.room.unwrap();
480
481        let mut server_rooms = self.rooms.lock();
482        let room = server_rooms.get_mut(&*room_name)?;
483        room.audio_tracks.iter().find_map(|track| {
484            if track.sid == *track_sid {
485                Some(track.muted.load(SeqCst))
486            } else {
487                None
488            }
489        })
490    }
491
492    pub(crate) fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
493        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
494        let room_name = claims.video.room.unwrap();
495        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
496
497        let mut server_rooms = self.rooms.lock();
498        let room = server_rooms
499            .get_mut(&*room_name)
500            .with_context(|| format!("room {room_name} does not exist"))?;
501        let client_room = room
502            .client_rooms
503            .get(&identity)
504            .context("not a participant in room")?;
505        Ok(room
506            .video_tracks
507            .iter()
508            .map(|track| RemoteVideoTrack {
509                server_track: track.clone(),
510                _room: client_room.downgrade(),
511            })
512            .collect())
513    }
514
515    pub(crate) fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
516        let claims = livekit_api::token::validate(&token, &self.secret_key)?;
517        let room_name = claims.video.room.unwrap();
518        let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
519
520        let mut server_rooms = self.rooms.lock();
521        let room = server_rooms
522            .get_mut(&*room_name)
523            .with_context(|| format!("room {room_name} does not exist"))?;
524        let client_room = room
525            .client_rooms
526            .get(&identity)
527            .context("not a participant in room")?;
528        Ok(room
529            .audio_tracks
530            .iter()
531            .map(|track| RemoteAudioTrack {
532                server_track: track.clone(),
533                room: client_room.downgrade(),
534            })
535            .collect())
536    }
537
538    async fn simulate_random_delay(&self) {
539        #[cfg(any(test, feature = "test-support"))]
540        self.executor.simulate_random_delay().await;
541    }
542}
543
544#[derive(Default, Debug)]
545struct TestServerRoom {
546    client_rooms: HashMap<ParticipantIdentity, Room>,
547    video_tracks: Vec<Arc<TestServerVideoTrack>>,
548    audio_tracks: Vec<Arc<TestServerAudioTrack>>,
549    participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
550}
551
552#[derive(Debug)]
553pub(crate) struct TestServerVideoTrack {
554    pub(crate) sid: TrackSid,
555    pub(crate) publisher_id: ParticipantIdentity,
556    // frames_rx: async_broadcast::Receiver<Frame>,
557}
558
559#[derive(Debug)]
560pub(crate) struct TestServerAudioTrack {
561    pub(crate) sid: TrackSid,
562    pub(crate) publisher_id: ParticipantIdentity,
563    pub(crate) muted: AtomicBool,
564}
565
566pub struct TestApiClient {
567    url: String,
568}
569
570#[async_trait]
571impl livekit_api::Client for TestApiClient {
572    fn url(&self) -> &str {
573        &self.url
574    }
575
576    async fn create_room(&self, name: String) -> Result<()> {
577        let server = TestServer::get(&self.url)?;
578        server.create_room(name).await?;
579        Ok(())
580    }
581
582    async fn delete_room(&self, name: String) -> Result<()> {
583        let server = TestServer::get(&self.url)?;
584        server.delete_room(name).await?;
585        Ok(())
586    }
587
588    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
589        let server = TestServer::get(&self.url)?;
590        server
591            .remove_participant(room, ParticipantIdentity(identity))
592            .await?;
593        Ok(())
594    }
595
596    async fn update_participant(
597        &self,
598        room: String,
599        identity: String,
600        permission: livekit_api::proto::ParticipantPermission,
601    ) -> Result<()> {
602        let server = TestServer::get(&self.url)?;
603        server
604            .update_participant(room, identity, permission)
605            .await?;
606        Ok(())
607    }
608
609    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
610        let server = TestServer::get(&self.url)?;
611        token::create(
612            &server.api_key,
613            &server.secret_key,
614            Some(identity),
615            token::VideoGrant::to_join(room),
616        )
617    }
618
619    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
620        let server = TestServer::get(&self.url)?;
621        token::create(
622            &server.api_key,
623            &server.secret_key,
624            Some(identity),
625            token::VideoGrant::for_guest(room),
626        )
627    }
628}
629
630pub(crate) struct RoomState {
631    pub(crate) url: String,
632    pub(crate) token: String,
633    pub(crate) local_identity: ParticipantIdentity,
634    pub(crate) connection_state: ConnectionState,
635    pub(crate) paused_audio_tracks: HashSet<TrackSid>,
636    pub(crate) updates_tx: mpsc::Sender<RoomEvent>,
637}
638
639#[derive(Clone, Debug)]
640pub struct Room(pub(crate) Arc<Mutex<RoomState>>);
641
642#[derive(Clone, Debug)]
643pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
644
645impl std::fmt::Debug for RoomState {
646    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647        f.debug_struct("Room")
648            .field("url", &self.url)
649            .field("token", &self.token)
650            .field("local_identity", &self.local_identity)
651            .field("connection_state", &self.connection_state)
652            .field("paused_audio_tracks", &self.paused_audio_tracks)
653            .finish()
654    }
655}
656
657impl Room {
658    pub(crate) fn downgrade(&self) -> WeakRoom {
659        WeakRoom(Arc::downgrade(&self.0))
660    }
661
662    pub fn connection_state(&self) -> ConnectionState {
663        self.0.lock().connection_state
664    }
665
666    pub fn local_participant(&self) -> LocalParticipant {
667        let identity = self.0.lock().local_identity.clone();
668        LocalParticipant {
669            identity,
670            room: self.clone(),
671        }
672    }
673
674    pub async fn connect(
675        url: String,
676        token: String,
677        _cx: &mut AsyncApp,
678    ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
679        let server = TestServer::get(&url)?;
680        let (updates_tx, updates_rx) = mpsc::channel(1024);
681        let this = Self(Arc::new(Mutex::new(RoomState {
682            local_identity: ParticipantIdentity(String::new()),
683            url: url.to_string(),
684            token: token.to_string(),
685            connection_state: ConnectionState::Disconnected,
686            paused_audio_tracks: Default::default(),
687            updates_tx,
688        })));
689
690        let identity = server
691            .join_room(token.to_string(), this.clone())
692            .await
693            .context("room join")?;
694        {
695            let mut state = this.0.lock();
696            state.local_identity = identity;
697            state.connection_state = ConnectionState::Connected;
698        }
699
700        Ok((this, updates_rx))
701    }
702
703    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
704        self.test_server()
705            .remote_participants(self.0.lock().token.clone())
706            .unwrap()
707    }
708
709    pub(crate) fn test_server(&self) -> Arc<TestServer> {
710        TestServer::get(&self.0.lock().url).unwrap()
711    }
712
713    pub(crate) fn token(&self) -> String {
714        self.0.lock().token.clone()
715    }
716
717    pub fn play_remote_audio_track(
718        &self,
719        _track: &RemoteAudioTrack,
720        _cx: &App,
721    ) -> anyhow::Result<AudioStream> {
722        Ok(AudioStream {})
723    }
724
725    pub async fn unpublish_local_track(&self, sid: TrackSid, cx: &mut AsyncApp) -> Result<()> {
726        self.local_participant().unpublish_track(sid, cx).await
727    }
728
729    pub async fn publish_local_microphone_track(
730        &self,
731        cx: &mut AsyncApp,
732    ) -> Result<(LocalTrackPublication, AudioStream)> {
733        self.local_participant().publish_microphone_track(cx).await
734    }
735}
736
737impl Drop for RoomState {
738    fn drop(&mut self) {
739        if self.connection_state == ConnectionState::Connected
740            && let Ok(server) = TestServer::get(&self.url)
741        {
742            let executor = server.executor.clone();
743            let token = self.token.clone();
744            executor
745                .spawn(async move { server.leave_room(token).await.ok() })
746                .detach();
747        }
748    }
749}
750
751impl WeakRoom {
752    pub(crate) fn upgrade(&self) -> Option<Room> {
753        self.0.upgrade().map(Room)
754    }
755}