1use crate::{AudioStream, Participant, RemoteTrack, RoomEvent, TrackPublication};
2
3use crate::mock_client::{participant::*, publication::*, track::*};
4use anyhow::{Context as _, Result};
5use async_trait::async_trait;
6use collections::{BTreeMap, HashMap, HashSet, btree_map::Entry as BTreeEntry, hash_map::Entry};
7use gpui::{App, AsyncApp, BackgroundExecutor};
8use livekit_api::{proto, token};
9use parking_lot::Mutex;
10use postage::{mpsc, sink::Sink};
11use std::sync::{
12 Arc, Weak,
13 atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
14};
15
16#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
17pub struct ParticipantIdentity(pub String);
18
19#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
20pub struct TrackSid(pub(crate) String);
21
22impl std::fmt::Display for TrackSid {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 self.0.fmt(f)
25 }
26}
27
28impl TryFrom<String> for TrackSid {
29 type Error = anyhow::Error;
30
31 fn try_from(value: String) -> Result<Self, Self::Error> {
32 Ok(TrackSid(value))
33 }
34}
35
36#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
37#[non_exhaustive]
38pub enum ConnectionState {
39 Connected,
40 Disconnected,
41}
42
43#[derive(Clone, Debug, Default)]
44pub struct SessionStats {
45 pub publisher_stats: Vec<RtcStats>,
46 pub subscriber_stats: Vec<RtcStats>,
47}
48
49#[derive(Clone, Debug)]
50pub enum RtcStats {}
51
52static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
53
54pub struct TestServer {
55 pub url: String,
56 pub api_key: String,
57 pub secret_key: String,
58 rooms: Mutex<HashMap<String, TestServerRoom>>,
59 executor: BackgroundExecutor,
60}
61
62impl TestServer {
63 pub fn create(
64 url: String,
65 api_key: String,
66 secret_key: String,
67 executor: BackgroundExecutor,
68 ) -> Result<Arc<TestServer>> {
69 let mut servers = SERVERS.lock();
70 if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
71 let server = Arc::new(TestServer {
72 url,
73 api_key,
74 secret_key,
75 rooms: Default::default(),
76 executor,
77 });
78 e.insert(server.clone());
79 Ok(server)
80 } else {
81 anyhow::bail!("a server with url {url:?} already exists");
82 }
83 }
84
85 fn get(url: &str) -> Result<Arc<TestServer>> {
86 Ok(SERVERS
87 .lock()
88 .get(url)
89 .context("no server found for url")?
90 .clone())
91 }
92
93 pub fn teardown(&self) -> Result<()> {
94 SERVERS
95 .lock()
96 .remove(&self.url)
97 .with_context(|| format!("server with url {:?} does not exist", self.url))?;
98 Ok(())
99 }
100
101 pub fn create_api_client(&self) -> TestApiClient {
102 TestApiClient {
103 url: self.url.clone(),
104 }
105 }
106
107 pub async fn create_room(&self, room: String) -> Result<()> {
108 self.simulate_random_delay().await;
109
110 let mut server_rooms = self.rooms.lock();
111 if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
112 e.insert(Default::default());
113 Ok(())
114 } else {
115 anyhow::bail!("{room:?} already exists");
116 }
117 }
118
119 async fn delete_room(&self, room: String) -> Result<()> {
120 self.simulate_random_delay().await;
121
122 let mut server_rooms = self.rooms.lock();
123 server_rooms
124 .remove(&room)
125 .with_context(|| format!("room {room:?} does not exist"))?;
126 Ok(())
127 }
128
129 async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
130 self.simulate_random_delay().await;
131
132 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
133 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
134 let room_name = claims.video.room.unwrap();
135 let mut server_rooms = self.rooms.lock();
136 let room = (*server_rooms).entry(room_name.to_string()).or_default();
137
138 if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
139 for server_track in &room.video_tracks {
140 let track = RemoteTrack::Video(RemoteVideoTrack {
141 server_track: server_track.clone(),
142 _room: client_room.downgrade(),
143 });
144 client_room
145 .0
146 .lock()
147 .updates_tx
148 .blocking_send(RoomEvent::TrackSubscribed {
149 track: track.clone(),
150 publication: RemoteTrackPublication {
151 sid: server_track.sid.clone(),
152 room: client_room.downgrade(),
153 track,
154 },
155 participant: RemoteParticipant {
156 room: client_room.downgrade(),
157 identity: server_track.publisher_id.clone(),
158 },
159 })
160 .unwrap();
161 }
162 for server_track in &room.audio_tracks {
163 let track = RemoteTrack::Audio(RemoteAudioTrack {
164 server_track: server_track.clone(),
165 room: client_room.downgrade(),
166 });
167 client_room
168 .0
169 .lock()
170 .updates_tx
171 .blocking_send(RoomEvent::TrackSubscribed {
172 track: track.clone(),
173 publication: RemoteTrackPublication {
174 sid: server_track.sid.clone(),
175 room: client_room.downgrade(),
176 track,
177 },
178 participant: RemoteParticipant {
179 room: client_room.downgrade(),
180 identity: server_track.publisher_id.clone(),
181 },
182 })
183 .unwrap();
184 }
185 e.insert(client_room);
186 Ok(identity)
187 } else {
188 anyhow::bail!("{identity:?} attempted to join room {room_name:?} twice");
189 }
190 }
191
192 async fn leave_room(&self, token: String) -> Result<()> {
193 self.simulate_random_delay().await;
194
195 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
196 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
197 let room_name = claims.video.room.unwrap();
198 let mut server_rooms = self.rooms.lock();
199 let room = server_rooms
200 .get_mut(&*room_name)
201 .with_context(|| format!("room {room_name:?} does not exist"))?;
202 room.client_rooms.remove(&identity).with_context(|| {
203 format!("{identity:?} attempted to leave room {room_name:?} before joining it")
204 })?;
205 Ok(())
206 }
207
208 fn remote_participants(
209 &self,
210 token: String,
211 ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
212 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
213 let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
214 let room_name = claims.video.room.unwrap().to_string();
215
216 if let Some(server_room) = self.rooms.lock().get(&room_name) {
217 let room = server_room
218 .client_rooms
219 .get(&local_identity)
220 .unwrap()
221 .downgrade();
222 Ok(server_room
223 .client_rooms
224 .iter()
225 .filter(|(identity, _)| *identity != &local_identity)
226 .map(|(identity, _)| {
227 (
228 identity.clone(),
229 RemoteParticipant {
230 room: room.clone(),
231 identity: identity.clone(),
232 },
233 )
234 })
235 .collect())
236 } else {
237 Ok(Default::default())
238 }
239 }
240
241 async fn remove_participant(
242 &self,
243 room_name: String,
244 identity: ParticipantIdentity,
245 ) -> Result<()> {
246 self.simulate_random_delay().await;
247
248 let mut server_rooms = self.rooms.lock();
249 let room = server_rooms
250 .get_mut(&room_name)
251 .with_context(|| format!("room {room_name} does not exist"))?;
252 room.client_rooms
253 .remove(&identity)
254 .with_context(|| format!("participant {identity:?} did not join room {room_name:?}"))?;
255 Ok(())
256 }
257
258 async fn update_participant(
259 &self,
260 room_name: String,
261 identity: String,
262 permission: proto::ParticipantPermission,
263 ) -> Result<()> {
264 self.simulate_random_delay().await;
265
266 let mut server_rooms = self.rooms.lock();
267 let room = server_rooms
268 .get_mut(&room_name)
269 .with_context(|| format!("room {room_name} does not exist"))?;
270 room.participant_permissions
271 .insert(ParticipantIdentity(identity), permission);
272 Ok(())
273 }
274
275 pub async fn disconnect_client(&self, client_identity: String) {
276 let client_identity = ParticipantIdentity(client_identity);
277
278 self.simulate_random_delay().await;
279
280 let mut server_rooms = self.rooms.lock();
281 for room in server_rooms.values_mut() {
282 if let Some(room) = room.client_rooms.remove(&client_identity) {
283 let mut room = room.0.lock();
284 room.connection_state = ConnectionState::Disconnected;
285 room.updates_tx
286 .blocking_send(RoomEvent::Disconnected {
287 reason: "SIGNAL_CLOSED",
288 })
289 .ok();
290 }
291 }
292 }
293
294 pub(crate) async fn publish_video_track(
295 &self,
296 token: String,
297 _local_track: LocalVideoTrack,
298 ) -> Result<TrackSid> {
299 self.simulate_random_delay().await;
300
301 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
302 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
303 let room_name = claims.video.room.unwrap();
304
305 let mut server_rooms = self.rooms.lock();
306 let room = server_rooms
307 .get_mut(&*room_name)
308 .with_context(|| format!("room {room_name} does not exist"))?;
309
310 let can_publish = room
311 .participant_permissions
312 .get(&identity)
313 .map(|permission| permission.can_publish)
314 .or(claims.video.can_publish)
315 .unwrap_or(true);
316
317 anyhow::ensure!(can_publish, "user is not allowed to publish");
318
319 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
320 let server_track = Arc::new(TestServerVideoTrack {
321 sid: sid.clone(),
322 publisher_id: identity.clone(),
323 });
324
325 room.video_tracks.push(server_track.clone());
326
327 for (room_identity, client_room) in &room.client_rooms {
328 if *room_identity != identity {
329 let track = RemoteTrack::Video(RemoteVideoTrack {
330 server_track: server_track.clone(),
331 _room: client_room.downgrade(),
332 });
333 let publication = RemoteTrackPublication {
334 sid: sid.clone(),
335 room: client_room.downgrade(),
336 track: track.clone(),
337 };
338 let participant = RemoteParticipant {
339 identity: identity.clone(),
340 room: client_room.downgrade(),
341 };
342 client_room
343 .0
344 .lock()
345 .updates_tx
346 .blocking_send(RoomEvent::TrackSubscribed {
347 track,
348 publication,
349 participant,
350 })
351 .unwrap();
352 }
353 }
354
355 Ok(sid)
356 }
357
358 pub(crate) async fn publish_audio_track(
359 &self,
360 token: String,
361 _local_track: &LocalAudioTrack,
362 ) -> Result<TrackSid> {
363 self.simulate_random_delay().await;
364
365 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
366 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
367 let room_name = claims.video.room.unwrap();
368
369 let mut server_rooms = self.rooms.lock();
370 let room = server_rooms
371 .get_mut(&*room_name)
372 .with_context(|| format!("room {room_name} does not exist"))?;
373
374 let can_publish = room
375 .participant_permissions
376 .get(&identity)
377 .map(|permission| permission.can_publish)
378 .or(claims.video.can_publish)
379 .unwrap_or(true);
380
381 anyhow::ensure!(can_publish, "user is not allowed to publish");
382
383 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
384 let server_track = Arc::new(TestServerAudioTrack {
385 sid: sid.clone(),
386 publisher_id: identity.clone(),
387 muted: AtomicBool::new(false),
388 });
389
390 room.audio_tracks.push(server_track.clone());
391
392 for (room_identity, client_room) in &room.client_rooms {
393 if *room_identity != identity {
394 let track = RemoteTrack::Audio(RemoteAudioTrack {
395 server_track: server_track.clone(),
396 room: client_room.downgrade(),
397 });
398 let publication = RemoteTrackPublication {
399 sid: sid.clone(),
400 room: client_room.downgrade(),
401 track: track.clone(),
402 };
403 let participant = RemoteParticipant {
404 identity: identity.clone(),
405 room: client_room.downgrade(),
406 };
407 client_room
408 .0
409 .lock()
410 .updates_tx
411 .blocking_send(RoomEvent::TrackSubscribed {
412 track,
413 publication,
414 participant,
415 })
416 .ok();
417 }
418 }
419
420 Ok(sid)
421 }
422
423 pub(crate) async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
424 Ok(())
425 }
426
427 pub(crate) fn set_track_muted(
428 &self,
429 token: &str,
430 track_sid: &TrackSid,
431 muted: bool,
432 ) -> Result<()> {
433 let claims = livekit_api::token::validate(token, &self.secret_key)?;
434 let room_name = claims.video.room.unwrap();
435 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
436 let mut server_rooms = self.rooms.lock();
437 let room = server_rooms
438 .get_mut(&*room_name)
439 .with_context(|| format!("room {room_name} does not exist"))?;
440 if let Some(track) = room
441 .audio_tracks
442 .iter_mut()
443 .find(|track| track.sid == *track_sid)
444 {
445 track.muted.store(muted, SeqCst);
446 for (id, client_room) in room.client_rooms.iter() {
447 if *id != identity {
448 let participant = Participant::Remote(RemoteParticipant {
449 identity: identity.clone(),
450 room: client_room.downgrade(),
451 });
452 let track = RemoteTrack::Audio(RemoteAudioTrack {
453 server_track: track.clone(),
454 room: client_room.downgrade(),
455 });
456 let publication = TrackPublication::Remote(RemoteTrackPublication {
457 sid: track_sid.clone(),
458 room: client_room.downgrade(),
459 track,
460 });
461
462 let event = if muted {
463 RoomEvent::TrackMuted {
464 participant,
465 publication,
466 }
467 } else {
468 RoomEvent::TrackUnmuted {
469 participant,
470 publication,
471 }
472 };
473
474 client_room
475 .0
476 .lock()
477 .updates_tx
478 .blocking_send(event)
479 .unwrap();
480 }
481 }
482 }
483 Ok(())
484 }
485
486 pub(crate) fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
487 let claims = livekit_api::token::validate(token, &self.secret_key).ok()?;
488 let room_name = claims.video.room.unwrap();
489
490 let mut server_rooms = self.rooms.lock();
491 let room = server_rooms.get_mut(&*room_name)?;
492 room.audio_tracks.iter().find_map(|track| {
493 if track.sid == *track_sid {
494 Some(track.muted.load(SeqCst))
495 } else {
496 None
497 }
498 })
499 }
500
501 pub(crate) fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
502 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
503 let room_name = claims.video.room.unwrap();
504 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
505
506 let mut server_rooms = self.rooms.lock();
507 let room = server_rooms
508 .get_mut(&*room_name)
509 .with_context(|| format!("room {room_name} does not exist"))?;
510 let client_room = room
511 .client_rooms
512 .get(&identity)
513 .context("not a participant in room")?;
514 Ok(room
515 .video_tracks
516 .iter()
517 .map(|track| RemoteVideoTrack {
518 server_track: track.clone(),
519 _room: client_room.downgrade(),
520 })
521 .collect())
522 }
523
524 pub(crate) fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
525 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
526 let room_name = claims.video.room.unwrap();
527 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
528
529 let mut server_rooms = self.rooms.lock();
530 let room = server_rooms
531 .get_mut(&*room_name)
532 .with_context(|| format!("room {room_name} does not exist"))?;
533 let client_room = room
534 .client_rooms
535 .get(&identity)
536 .context("not a participant in room")?;
537 Ok(room
538 .audio_tracks
539 .iter()
540 .map(|track| RemoteAudioTrack {
541 server_track: track.clone(),
542 room: client_room.downgrade(),
543 })
544 .collect())
545 }
546
547 async fn simulate_random_delay(&self) {
548 #[cfg(any(test, feature = "test-support"))]
549 self.executor.simulate_random_delay().await;
550 }
551}
552
553#[derive(Default, Debug)]
554struct TestServerRoom {
555 client_rooms: HashMap<ParticipantIdentity, Room>,
556 video_tracks: Vec<Arc<TestServerVideoTrack>>,
557 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
558 participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
559}
560
561#[derive(Debug)]
562pub(crate) struct TestServerVideoTrack {
563 pub(crate) sid: TrackSid,
564 pub(crate) publisher_id: ParticipantIdentity,
565 // frames_rx: async_broadcast::Receiver<Frame>,
566}
567
568#[derive(Debug)]
569pub(crate) struct TestServerAudioTrack {
570 pub(crate) sid: TrackSid,
571 pub(crate) publisher_id: ParticipantIdentity,
572 pub(crate) muted: AtomicBool,
573}
574
575pub struct TestApiClient {
576 url: String,
577}
578
579#[async_trait]
580impl livekit_api::Client for TestApiClient {
581 fn url(&self) -> &str {
582 &self.url
583 }
584
585 async fn create_room(&self, name: String) -> Result<()> {
586 let server = TestServer::get(&self.url)?;
587 server.create_room(name).await?;
588 Ok(())
589 }
590
591 async fn delete_room(&self, name: String) -> Result<()> {
592 let server = TestServer::get(&self.url)?;
593 server.delete_room(name).await?;
594 Ok(())
595 }
596
597 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
598 let server = TestServer::get(&self.url)?;
599 server
600 .remove_participant(room, ParticipantIdentity(identity))
601 .await?;
602 Ok(())
603 }
604
605 async fn update_participant(
606 &self,
607 room: String,
608 identity: String,
609 permission: livekit_api::proto::ParticipantPermission,
610 ) -> Result<()> {
611 let server = TestServer::get(&self.url)?;
612 server
613 .update_participant(room, identity, permission)
614 .await?;
615 Ok(())
616 }
617
618 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
619 let server = TestServer::get(&self.url)?;
620 token::create(
621 &server.api_key,
622 &server.secret_key,
623 Some(identity),
624 token::VideoGrant::to_join(room),
625 )
626 }
627
628 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
629 let server = TestServer::get(&self.url)?;
630 token::create(
631 &server.api_key,
632 &server.secret_key,
633 Some(identity),
634 token::VideoGrant::for_guest(room),
635 )
636 }
637}
638
639pub(crate) struct RoomState {
640 pub(crate) url: String,
641 pub(crate) token: String,
642 pub(crate) local_identity: ParticipantIdentity,
643 pub(crate) connection_state: ConnectionState,
644 pub(crate) paused_audio_tracks: HashSet<TrackSid>,
645 pub(crate) updates_tx: mpsc::Sender<RoomEvent>,
646}
647
648#[derive(Clone, Debug)]
649pub struct Room(pub(crate) Arc<Mutex<RoomState>>);
650
651#[derive(Clone, Debug)]
652pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
653
654impl std::fmt::Debug for RoomState {
655 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
656 f.debug_struct("Room")
657 .field("url", &self.url)
658 .field("token", &self.token)
659 .field("local_identity", &self.local_identity)
660 .field("connection_state", &self.connection_state)
661 .field("paused_audio_tracks", &self.paused_audio_tracks)
662 .finish()
663 }
664}
665
666impl Room {
667 pub(crate) fn downgrade(&self) -> WeakRoom {
668 WeakRoom(Arc::downgrade(&self.0))
669 }
670
671 pub fn connection_state(&self) -> ConnectionState {
672 self.0.lock().connection_state
673 }
674
675 pub fn local_participant(&self) -> LocalParticipant {
676 let identity = self.0.lock().local_identity.clone();
677 LocalParticipant {
678 identity,
679 room: self.clone(),
680 }
681 }
682
683 pub async fn connect(
684 url: String,
685 token: String,
686 _cx: &mut AsyncApp,
687 ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
688 let server = TestServer::get(&url)?;
689 let (updates_tx, updates_rx) = mpsc::channel(1024);
690 let this = Self(Arc::new(Mutex::new(RoomState {
691 local_identity: ParticipantIdentity(String::new()),
692 url: url.to_string(),
693 token: token.to_string(),
694 connection_state: ConnectionState::Disconnected,
695 paused_audio_tracks: Default::default(),
696 updates_tx,
697 })));
698
699 let identity = server
700 .join_room(token.to_string(), this.clone())
701 .await
702 .context("room join")?;
703 {
704 let mut state = this.0.lock();
705 state.local_identity = identity;
706 state.connection_state = ConnectionState::Connected;
707 }
708
709 Ok((this, updates_rx))
710 }
711
712 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
713 self.test_server()
714 .remote_participants(self.0.lock().token.clone())
715 .unwrap()
716 }
717
718 pub(crate) fn test_server(&self) -> Arc<TestServer> {
719 TestServer::get(&self.0.lock().url).unwrap()
720 }
721
722 pub(crate) fn token(&self) -> String {
723 self.0.lock().token.clone()
724 }
725
726 pub fn name(&self) -> String {
727 "test_room".to_string()
728 }
729
730 pub async fn sid(&self) -> String {
731 "RM_test_session".to_string()
732 }
733
734 pub fn play_remote_audio_track(
735 &self,
736 _track: &RemoteAudioTrack,
737 _cx: &App,
738 ) -> anyhow::Result<AudioStream> {
739 Ok(AudioStream {})
740 }
741
742 pub async fn unpublish_local_track(&self, sid: TrackSid, cx: &mut AsyncApp) -> Result<()> {
743 self.local_participant().unpublish_track(sid, cx).await
744 }
745
746 pub async fn publish_local_microphone_track(
747 &self,
748 _track_name: String,
749 _is_staff: bool,
750 cx: &mut AsyncApp,
751 ) -> Result<(LocalTrackPublication, AudioStream, Arc<AtomicU64>)> {
752 self.local_participant().publish_microphone_track(cx).await
753 }
754
755 pub async fn get_stats(&self) -> Result<SessionStats> {
756 Ok(SessionStats::default())
757 }
758
759 pub fn stats_task(&self, _cx: &impl gpui::AppContext) -> gpui::Task<Result<SessionStats>> {
760 gpui::Task::ready(Ok(SessionStats::default()))
761 }
762}
763
764impl Drop for RoomState {
765 fn drop(&mut self) {
766 if self.connection_state == ConnectionState::Connected
767 && let Ok(server) = TestServer::get(&self.url)
768 {
769 let executor = server.executor.clone();
770 let token = self.token.clone();
771 executor
772 .spawn(async move { server.leave_room(token).await.ok() })
773 .detach();
774 }
775 }
776}
777
778impl WeakRoom {
779 pub(crate) fn upgrade(&self) -> Option<Room> {
780 self.0.upgrade().map(Room)
781 }
782}