1use crate::{AudioStream, Participant, RemoteTrack, RoomEvent, TrackPublication};
2
3use crate::mock_client::{participant::*, publication::*, track::*};
4use anyhow::{Context as _, Result, anyhow};
5use async_trait::async_trait;
6use collections::{BTreeMap, HashMap, HashSet, btree_map::Entry as BTreeEntry, hash_map::Entry};
7use gpui::{App, AsyncApp, BackgroundExecutor};
8use livekit_api::{proto, token};
9use parking_lot::Mutex;
10use postage::{mpsc, sink::Sink};
11use std::sync::{
12 Arc, Weak,
13 atomic::{AtomicBool, Ordering::SeqCst},
14};
15
16#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
17pub struct ParticipantIdentity(pub String);
18
19#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
20pub struct TrackSid(pub(crate) String);
21
22impl std::fmt::Display for TrackSid {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 self.0.fmt(f)
25 }
26}
27
28impl TryFrom<String> for TrackSid {
29 type Error = anyhow::Error;
30
31 fn try_from(value: String) -> Result<Self, Self::Error> {
32 Ok(TrackSid(value))
33 }
34}
35
36#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
37#[non_exhaustive]
38pub enum ConnectionState {
39 Connected,
40 Disconnected,
41}
42
43static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
44
45pub struct TestServer {
46 pub url: String,
47 pub api_key: String,
48 pub secret_key: String,
49 rooms: Mutex<HashMap<String, TestServerRoom>>,
50 executor: BackgroundExecutor,
51}
52
53impl TestServer {
54 pub fn create(
55 url: String,
56 api_key: String,
57 secret_key: String,
58 executor: BackgroundExecutor,
59 ) -> Result<Arc<TestServer>> {
60 let mut servers = SERVERS.lock();
61 if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
62 let server = Arc::new(TestServer {
63 url,
64 api_key,
65 secret_key,
66 rooms: Default::default(),
67 executor,
68 });
69 e.insert(server.clone());
70 Ok(server)
71 } else {
72 Err(anyhow!("a server with url {:?} already exists", url))
73 }
74 }
75
76 fn get(url: &str) -> Result<Arc<TestServer>> {
77 Ok(SERVERS
78 .lock()
79 .get(url)
80 .ok_or_else(|| anyhow!("no server found for url"))?
81 .clone())
82 }
83
84 pub fn teardown(&self) -> Result<()> {
85 SERVERS
86 .lock()
87 .remove(&self.url)
88 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
89 Ok(())
90 }
91
92 pub fn create_api_client(&self) -> TestApiClient {
93 TestApiClient {
94 url: self.url.clone(),
95 }
96 }
97
98 pub async fn create_room(&self, room: String) -> Result<()> {
99 self.simulate_random_delay().await;
100
101 let mut server_rooms = self.rooms.lock();
102 if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
103 e.insert(Default::default());
104 Ok(())
105 } else {
106 Err(anyhow!("room {:?} already exists", room))
107 }
108 }
109
110 async fn delete_room(&self, room: String) -> Result<()> {
111 self.simulate_random_delay().await;
112
113 let mut server_rooms = self.rooms.lock();
114 server_rooms
115 .remove(&room)
116 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
117 Ok(())
118 }
119
120 async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
121 self.simulate_random_delay().await;
122
123 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
124 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
125 let room_name = claims.video.room.unwrap();
126 let mut server_rooms = self.rooms.lock();
127 let room = (*server_rooms).entry(room_name.to_string()).or_default();
128
129 if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
130 for server_track in &room.video_tracks {
131 let track = RemoteTrack::Video(RemoteVideoTrack {
132 server_track: server_track.clone(),
133 _room: client_room.downgrade(),
134 });
135 client_room
136 .0
137 .lock()
138 .updates_tx
139 .blocking_send(RoomEvent::TrackSubscribed {
140 track: track.clone(),
141 publication: RemoteTrackPublication {
142 sid: server_track.sid.clone(),
143 room: client_room.downgrade(),
144 track,
145 },
146 participant: RemoteParticipant {
147 room: client_room.downgrade(),
148 identity: server_track.publisher_id.clone(),
149 },
150 })
151 .unwrap();
152 }
153 for server_track in &room.audio_tracks {
154 let track = RemoteTrack::Audio(RemoteAudioTrack {
155 server_track: server_track.clone(),
156 room: client_room.downgrade(),
157 });
158 client_room
159 .0
160 .lock()
161 .updates_tx
162 .blocking_send(RoomEvent::TrackSubscribed {
163 track: track.clone(),
164 publication: RemoteTrackPublication {
165 sid: server_track.sid.clone(),
166 room: client_room.downgrade(),
167 track,
168 },
169 participant: RemoteParticipant {
170 room: client_room.downgrade(),
171 identity: server_track.publisher_id.clone(),
172 },
173 })
174 .unwrap();
175 }
176 e.insert(client_room);
177 Ok(identity)
178 } else {
179 Err(anyhow!(
180 "{:?} attempted to join room {:?} twice",
181 identity,
182 room_name
183 ))
184 }
185 }
186
187 async fn leave_room(&self, token: String) -> Result<()> {
188 self.simulate_random_delay().await;
189
190 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
191 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
192 let room_name = claims.video.room.unwrap();
193 let mut server_rooms = self.rooms.lock();
194 let room = server_rooms
195 .get_mut(&*room_name)
196 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
197 room.client_rooms.remove(&identity).ok_or_else(|| {
198 anyhow!(
199 "{:?} attempted to leave room {:?} before joining it",
200 identity,
201 room_name
202 )
203 })?;
204 Ok(())
205 }
206
207 fn remote_participants(
208 &self,
209 token: String,
210 ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
211 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
212 let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
213 let room_name = claims.video.room.unwrap().to_string();
214
215 if let Some(server_room) = self.rooms.lock().get(&room_name) {
216 let room = server_room
217 .client_rooms
218 .get(&local_identity)
219 .unwrap()
220 .downgrade();
221 Ok(server_room
222 .client_rooms
223 .iter()
224 .filter(|(identity, _)| *identity != &local_identity)
225 .map(|(identity, _)| {
226 (
227 identity.clone(),
228 RemoteParticipant {
229 room: room.clone(),
230 identity: identity.clone(),
231 },
232 )
233 })
234 .collect())
235 } else {
236 Ok(Default::default())
237 }
238 }
239
240 async fn remove_participant(
241 &self,
242 room_name: String,
243 identity: ParticipantIdentity,
244 ) -> Result<()> {
245 self.simulate_random_delay().await;
246
247 let mut server_rooms = self.rooms.lock();
248 let room = server_rooms
249 .get_mut(&room_name)
250 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
251 room.client_rooms.remove(&identity).ok_or_else(|| {
252 anyhow!(
253 "participant {:?} did not join room {:?}",
254 identity,
255 room_name
256 )
257 })?;
258 Ok(())
259 }
260
261 async fn update_participant(
262 &self,
263 room_name: String,
264 identity: String,
265 permission: proto::ParticipantPermission,
266 ) -> Result<()> {
267 self.simulate_random_delay().await;
268
269 let mut server_rooms = self.rooms.lock();
270 let room = server_rooms
271 .get_mut(&room_name)
272 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
273 room.participant_permissions
274 .insert(ParticipantIdentity(identity), permission);
275 Ok(())
276 }
277
278 pub async fn disconnect_client(&self, client_identity: String) {
279 let client_identity = ParticipantIdentity(client_identity);
280
281 self.simulate_random_delay().await;
282
283 let mut server_rooms = self.rooms.lock();
284 for room in server_rooms.values_mut() {
285 if let Some(room) = room.client_rooms.remove(&client_identity) {
286 let mut room = room.0.lock();
287 room.connection_state = ConnectionState::Disconnected;
288 room.updates_tx
289 .blocking_send(RoomEvent::Disconnected {
290 reason: "SIGNAL_CLOSED",
291 })
292 .ok();
293 }
294 }
295 }
296
297 pub(crate) async fn publish_video_track(
298 &self,
299 token: String,
300 _local_track: LocalVideoTrack,
301 ) -> Result<TrackSid> {
302 self.simulate_random_delay().await;
303
304 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
305 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
306 let room_name = claims.video.room.unwrap();
307
308 let mut server_rooms = self.rooms.lock();
309 let room = server_rooms
310 .get_mut(&*room_name)
311 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
312
313 let can_publish = room
314 .participant_permissions
315 .get(&identity)
316 .map(|permission| permission.can_publish)
317 .or(claims.video.can_publish)
318 .unwrap_or(true);
319
320 if !can_publish {
321 return Err(anyhow!("user is not allowed to publish"));
322 }
323
324 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
325 let server_track = Arc::new(TestServerVideoTrack {
326 sid: sid.clone(),
327 publisher_id: identity.clone(),
328 });
329
330 room.video_tracks.push(server_track.clone());
331
332 for (room_identity, client_room) in &room.client_rooms {
333 if *room_identity != identity {
334 let track = RemoteTrack::Video(RemoteVideoTrack {
335 server_track: server_track.clone(),
336 _room: client_room.downgrade(),
337 });
338 let publication = RemoteTrackPublication {
339 sid: sid.clone(),
340 room: client_room.downgrade(),
341 track: track.clone(),
342 };
343 let participant = RemoteParticipant {
344 identity: identity.clone(),
345 room: client_room.downgrade(),
346 };
347 client_room
348 .0
349 .lock()
350 .updates_tx
351 .blocking_send(RoomEvent::TrackSubscribed {
352 track,
353 publication,
354 participant,
355 })
356 .unwrap();
357 }
358 }
359
360 Ok(sid)
361 }
362
363 pub(crate) async fn publish_audio_track(
364 &self,
365 token: String,
366 _local_track: &LocalAudioTrack,
367 ) -> Result<TrackSid> {
368 self.simulate_random_delay().await;
369
370 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
371 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
372 let room_name = claims.video.room.unwrap();
373
374 let mut server_rooms = self.rooms.lock();
375 let room = server_rooms
376 .get_mut(&*room_name)
377 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
378
379 let can_publish = room
380 .participant_permissions
381 .get(&identity)
382 .map(|permission| permission.can_publish)
383 .or(claims.video.can_publish)
384 .unwrap_or(true);
385
386 if !can_publish {
387 return Err(anyhow!("user is not allowed to publish"));
388 }
389
390 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
391 let server_track = Arc::new(TestServerAudioTrack {
392 sid: sid.clone(),
393 publisher_id: identity.clone(),
394 muted: AtomicBool::new(false),
395 });
396
397 room.audio_tracks.push(server_track.clone());
398
399 for (room_identity, client_room) in &room.client_rooms {
400 if *room_identity != identity {
401 let track = RemoteTrack::Audio(RemoteAudioTrack {
402 server_track: server_track.clone(),
403 room: client_room.downgrade(),
404 });
405 let publication = RemoteTrackPublication {
406 sid: sid.clone(),
407 room: client_room.downgrade(),
408 track: track.clone(),
409 };
410 let participant = RemoteParticipant {
411 identity: identity.clone(),
412 room: client_room.downgrade(),
413 };
414 client_room
415 .0
416 .lock()
417 .updates_tx
418 .blocking_send(RoomEvent::TrackSubscribed {
419 track,
420 publication,
421 participant,
422 })
423 .ok();
424 }
425 }
426
427 Ok(sid)
428 }
429
430 pub(crate) async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
431 Ok(())
432 }
433
434 pub(crate) fn set_track_muted(
435 &self,
436 token: &str,
437 track_sid: &TrackSid,
438 muted: bool,
439 ) -> Result<()> {
440 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
441 let room_name = claims.video.room.unwrap();
442 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
443 let mut server_rooms = self.rooms.lock();
444 let room = server_rooms
445 .get_mut(&*room_name)
446 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
447 if let Some(track) = room
448 .audio_tracks
449 .iter_mut()
450 .find(|track| track.sid == *track_sid)
451 {
452 track.muted.store(muted, SeqCst);
453 for (id, client_room) in room.client_rooms.iter() {
454 if *id != identity {
455 let participant = Participant::Remote(RemoteParticipant {
456 identity: identity.clone(),
457 room: client_room.downgrade(),
458 });
459 let track = RemoteTrack::Audio(RemoteAudioTrack {
460 server_track: track.clone(),
461 room: client_room.downgrade(),
462 });
463 let publication = TrackPublication::Remote(RemoteTrackPublication {
464 sid: track_sid.clone(),
465 room: client_room.downgrade(),
466 track,
467 });
468
469 let event = if muted {
470 RoomEvent::TrackMuted {
471 participant,
472 publication,
473 }
474 } else {
475 RoomEvent::TrackUnmuted {
476 participant,
477 publication,
478 }
479 };
480
481 client_room
482 .0
483 .lock()
484 .updates_tx
485 .blocking_send(event)
486 .unwrap();
487 }
488 }
489 }
490 Ok(())
491 }
492
493 pub(crate) fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
494 let claims = livekit_api::token::validate(&token, &self.secret_key).ok()?;
495 let room_name = claims.video.room.unwrap();
496
497 let mut server_rooms = self.rooms.lock();
498 let room = server_rooms.get_mut(&*room_name)?;
499 room.audio_tracks.iter().find_map(|track| {
500 if track.sid == *track_sid {
501 Some(track.muted.load(SeqCst))
502 } else {
503 None
504 }
505 })
506 }
507
508 pub(crate) fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
509 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
510 let room_name = claims.video.room.unwrap();
511 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
512
513 let mut server_rooms = self.rooms.lock();
514 let room = server_rooms
515 .get_mut(&*room_name)
516 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
517 let client_room = room
518 .client_rooms
519 .get(&identity)
520 .ok_or_else(|| anyhow!("not a participant in room"))?;
521 Ok(room
522 .video_tracks
523 .iter()
524 .map(|track| RemoteVideoTrack {
525 server_track: track.clone(),
526 _room: client_room.downgrade(),
527 })
528 .collect())
529 }
530
531 pub(crate) fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
532 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
533 let room_name = claims.video.room.unwrap();
534 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
535
536 let mut server_rooms = self.rooms.lock();
537 let room = server_rooms
538 .get_mut(&*room_name)
539 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
540 let client_room = room
541 .client_rooms
542 .get(&identity)
543 .ok_or_else(|| anyhow!("not a participant in room"))?;
544 Ok(room
545 .audio_tracks
546 .iter()
547 .map(|track| RemoteAudioTrack {
548 server_track: track.clone(),
549 room: client_room.downgrade(),
550 })
551 .collect())
552 }
553
554 async fn simulate_random_delay(&self) {
555 #[cfg(any(test, feature = "test-support"))]
556 self.executor.simulate_random_delay().await;
557 }
558}
559
560#[derive(Default, Debug)]
561struct TestServerRoom {
562 client_rooms: HashMap<ParticipantIdentity, Room>,
563 video_tracks: Vec<Arc<TestServerVideoTrack>>,
564 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
565 participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
566}
567
568#[derive(Debug)]
569pub(crate) struct TestServerVideoTrack {
570 pub(crate) sid: TrackSid,
571 pub(crate) publisher_id: ParticipantIdentity,
572 // frames_rx: async_broadcast::Receiver<Frame>,
573}
574
575#[derive(Debug)]
576pub(crate) struct TestServerAudioTrack {
577 pub(crate) sid: TrackSid,
578 pub(crate) publisher_id: ParticipantIdentity,
579 pub(crate) muted: AtomicBool,
580}
581
582pub struct TestApiClient {
583 url: String,
584}
585
586#[async_trait]
587impl livekit_api::Client for TestApiClient {
588 fn url(&self) -> &str {
589 &self.url
590 }
591
592 async fn create_room(&self, name: String) -> Result<()> {
593 let server = TestServer::get(&self.url)?;
594 server.create_room(name).await?;
595 Ok(())
596 }
597
598 async fn delete_room(&self, name: String) -> Result<()> {
599 let server = TestServer::get(&self.url)?;
600 server.delete_room(name).await?;
601 Ok(())
602 }
603
604 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
605 let server = TestServer::get(&self.url)?;
606 server
607 .remove_participant(room, ParticipantIdentity(identity))
608 .await?;
609 Ok(())
610 }
611
612 async fn update_participant(
613 &self,
614 room: String,
615 identity: String,
616 permission: livekit_api::proto::ParticipantPermission,
617 ) -> Result<()> {
618 let server = TestServer::get(&self.url)?;
619 server
620 .update_participant(room, identity, permission)
621 .await?;
622 Ok(())
623 }
624
625 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
626 let server = TestServer::get(&self.url)?;
627 token::create(
628 &server.api_key,
629 &server.secret_key,
630 Some(identity),
631 token::VideoGrant::to_join(room),
632 )
633 }
634
635 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
636 let server = TestServer::get(&self.url)?;
637 token::create(
638 &server.api_key,
639 &server.secret_key,
640 Some(identity),
641 token::VideoGrant::for_guest(room),
642 )
643 }
644}
645
646pub(crate) struct RoomState {
647 pub(crate) url: String,
648 pub(crate) token: String,
649 pub(crate) local_identity: ParticipantIdentity,
650 pub(crate) connection_state: ConnectionState,
651 pub(crate) paused_audio_tracks: HashSet<TrackSid>,
652 pub(crate) updates_tx: mpsc::Sender<RoomEvent>,
653}
654
655#[derive(Clone, Debug)]
656pub struct Room(pub(crate) Arc<Mutex<RoomState>>);
657
658#[derive(Clone, Debug)]
659pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
660
661impl std::fmt::Debug for RoomState {
662 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
663 f.debug_struct("Room")
664 .field("url", &self.url)
665 .field("token", &self.token)
666 .field("local_identity", &self.local_identity)
667 .field("connection_state", &self.connection_state)
668 .field("paused_audio_tracks", &self.paused_audio_tracks)
669 .finish()
670 }
671}
672
673impl Room {
674 pub(crate) fn downgrade(&self) -> WeakRoom {
675 WeakRoom(Arc::downgrade(&self.0))
676 }
677
678 pub fn connection_state(&self) -> ConnectionState {
679 self.0.lock().connection_state
680 }
681
682 pub fn local_participant(&self) -> LocalParticipant {
683 let identity = self.0.lock().local_identity.clone();
684 LocalParticipant {
685 identity,
686 room: self.clone(),
687 }
688 }
689
690 pub async fn connect(
691 url: String,
692 token: String,
693 _cx: &mut AsyncApp,
694 ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
695 let server = TestServer::get(&url)?;
696 let (updates_tx, updates_rx) = mpsc::channel(1024);
697 let this = Self(Arc::new(Mutex::new(RoomState {
698 local_identity: ParticipantIdentity(String::new()),
699 url: url.to_string(),
700 token: token.to_string(),
701 connection_state: ConnectionState::Disconnected,
702 paused_audio_tracks: Default::default(),
703 updates_tx,
704 })));
705
706 let identity = server
707 .join_room(token.to_string(), this.clone())
708 .await
709 .context("room join")?;
710 {
711 let mut state = this.0.lock();
712 state.local_identity = identity;
713 state.connection_state = ConnectionState::Connected;
714 }
715
716 Ok((this, updates_rx))
717 }
718
719 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
720 self.test_server()
721 .remote_participants(self.0.lock().token.clone())
722 .unwrap()
723 }
724
725 pub(crate) fn test_server(&self) -> Arc<TestServer> {
726 TestServer::get(&self.0.lock().url).unwrap()
727 }
728
729 pub(crate) fn token(&self) -> String {
730 self.0.lock().token.clone()
731 }
732
733 pub fn play_remote_audio_track(
734 &self,
735 _track: &RemoteAudioTrack,
736 _cx: &App,
737 ) -> anyhow::Result<AudioStream> {
738 Ok(AudioStream {})
739 }
740
741 pub async fn unpublish_local_track(&self, sid: TrackSid, cx: &mut AsyncApp) -> Result<()> {
742 self.local_participant().unpublish_track(sid, cx).await
743 }
744
745 pub async fn publish_local_microphone_track(
746 &self,
747 cx: &mut AsyncApp,
748 ) -> Result<(LocalTrackPublication, AudioStream)> {
749 self.local_participant().publish_microphone_track(cx).await
750 }
751}
752
753impl Drop for RoomState {
754 fn drop(&mut self) {
755 if self.connection_state == ConnectionState::Connected {
756 if let Ok(server) = TestServer::get(&self.url) {
757 let executor = server.executor.clone();
758 let token = self.token.clone();
759 executor
760 .spawn(async move { server.leave_room(token).await.ok() })
761 .detach();
762 }
763 }
764 }
765}
766
767impl WeakRoom {
768 pub(crate) fn upgrade(&self) -> Option<Room> {
769 self.0.upgrade().map(Room)
770 }
771}