1use crate::{AudioStream, Participant, RemoteTrack, RoomEvent, TrackPublication};
2
3use crate::mock_client::{participant::*, publication::*, track::*};
4use anyhow::{Context as _, Result};
5use async_trait::async_trait;
6use collections::{BTreeMap, HashMap, HashSet, btree_map::Entry as BTreeEntry, hash_map::Entry};
7use gpui::{App, AsyncApp, BackgroundExecutor};
8use livekit_api::{proto, token};
9use parking_lot::Mutex;
10use postage::{mpsc, sink::Sink};
11use std::sync::{
12 Arc, Weak,
13 atomic::{AtomicBool, Ordering::SeqCst},
14};
15
16#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
17pub struct ParticipantIdentity(pub String);
18
19#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
20pub struct TrackSid(pub(crate) String);
21
22impl std::fmt::Display for TrackSid {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 self.0.fmt(f)
25 }
26}
27
28impl TryFrom<String> for TrackSid {
29 type Error = anyhow::Error;
30
31 fn try_from(value: String) -> Result<Self, Self::Error> {
32 Ok(TrackSid(value))
33 }
34}
35
36#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
37#[non_exhaustive]
38pub enum ConnectionState {
39 Connected,
40 Disconnected,
41}
42
43static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
44
45pub struct TestServer {
46 pub url: String,
47 pub api_key: String,
48 pub secret_key: String,
49 rooms: Mutex<HashMap<String, TestServerRoom>>,
50 executor: BackgroundExecutor,
51}
52
53impl TestServer {
54 pub fn create(
55 url: String,
56 api_key: String,
57 secret_key: String,
58 executor: BackgroundExecutor,
59 ) -> Result<Arc<TestServer>> {
60 let mut servers = SERVERS.lock();
61 if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
62 let server = Arc::new(TestServer {
63 url,
64 api_key,
65 secret_key,
66 rooms: Default::default(),
67 executor,
68 });
69 e.insert(server.clone());
70 Ok(server)
71 } else {
72 anyhow::bail!("a server with url {url:?} already exists");
73 }
74 }
75
76 fn get(url: &str) -> Result<Arc<TestServer>> {
77 Ok(SERVERS
78 .lock()
79 .get(url)
80 .context("no server found for url")?
81 .clone())
82 }
83
84 pub fn teardown(&self) -> Result<()> {
85 SERVERS
86 .lock()
87 .remove(&self.url)
88 .with_context(|| format!("server with url {:?} does not exist", self.url))?;
89 Ok(())
90 }
91
92 pub fn create_api_client(&self) -> TestApiClient {
93 TestApiClient {
94 url: self.url.clone(),
95 }
96 }
97
98 pub async fn create_room(&self, room: String) -> Result<()> {
99 self.simulate_random_delay().await;
100
101 let mut server_rooms = self.rooms.lock();
102 if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
103 e.insert(Default::default());
104 Ok(())
105 } else {
106 anyhow::bail!("{room:?} already exists");
107 }
108 }
109
110 async fn delete_room(&self, room: String) -> Result<()> {
111 self.simulate_random_delay().await;
112
113 let mut server_rooms = self.rooms.lock();
114 server_rooms
115 .remove(&room)
116 .with_context(|| format!("room {room:?} does not exist"))?;
117 Ok(())
118 }
119
120 async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
121 self.simulate_random_delay().await;
122
123 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
124 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
125 let room_name = claims.video.room.unwrap();
126 let mut server_rooms = self.rooms.lock();
127 let room = (*server_rooms).entry(room_name.to_string()).or_default();
128
129 if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
130 for server_track in &room.video_tracks {
131 let track = RemoteTrack::Video(RemoteVideoTrack {
132 server_track: server_track.clone(),
133 _room: client_room.downgrade(),
134 });
135 client_room
136 .0
137 .lock()
138 .updates_tx
139 .blocking_send(RoomEvent::TrackSubscribed {
140 track: track.clone(),
141 publication: RemoteTrackPublication {
142 sid: server_track.sid.clone(),
143 room: client_room.downgrade(),
144 track,
145 },
146 participant: RemoteParticipant {
147 room: client_room.downgrade(),
148 identity: server_track.publisher_id.clone(),
149 },
150 })
151 .unwrap();
152 }
153 for server_track in &room.audio_tracks {
154 let track = RemoteTrack::Audio(RemoteAudioTrack {
155 server_track: server_track.clone(),
156 room: client_room.downgrade(),
157 });
158 client_room
159 .0
160 .lock()
161 .updates_tx
162 .blocking_send(RoomEvent::TrackSubscribed {
163 track: track.clone(),
164 publication: RemoteTrackPublication {
165 sid: server_track.sid.clone(),
166 room: client_room.downgrade(),
167 track,
168 },
169 participant: RemoteParticipant {
170 room: client_room.downgrade(),
171 identity: server_track.publisher_id.clone(),
172 },
173 })
174 .unwrap();
175 }
176 e.insert(client_room);
177 Ok(identity)
178 } else {
179 anyhow::bail!("{identity:?} attempted to join room {room_name:?} twice");
180 }
181 }
182
183 async fn leave_room(&self, token: String) -> Result<()> {
184 self.simulate_random_delay().await;
185
186 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
187 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
188 let room_name = claims.video.room.unwrap();
189 let mut server_rooms = self.rooms.lock();
190 let room = server_rooms
191 .get_mut(&*room_name)
192 .with_context(|| format!("room {room_name:?} does not exist"))?;
193 room.client_rooms.remove(&identity).with_context(|| {
194 format!("{identity:?} attempted to leave room {room_name:?} before joining it")
195 })?;
196 Ok(())
197 }
198
199 fn remote_participants(
200 &self,
201 token: String,
202 ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
203 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
204 let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
205 let room_name = claims.video.room.unwrap().to_string();
206
207 if let Some(server_room) = self.rooms.lock().get(&room_name) {
208 let room = server_room
209 .client_rooms
210 .get(&local_identity)
211 .unwrap()
212 .downgrade();
213 Ok(server_room
214 .client_rooms
215 .iter()
216 .filter(|(identity, _)| *identity != &local_identity)
217 .map(|(identity, _)| {
218 (
219 identity.clone(),
220 RemoteParticipant {
221 room: room.clone(),
222 identity: identity.clone(),
223 },
224 )
225 })
226 .collect())
227 } else {
228 Ok(Default::default())
229 }
230 }
231
232 async fn remove_participant(
233 &self,
234 room_name: String,
235 identity: ParticipantIdentity,
236 ) -> Result<()> {
237 self.simulate_random_delay().await;
238
239 let mut server_rooms = self.rooms.lock();
240 let room = server_rooms
241 .get_mut(&room_name)
242 .with_context(|| format!("room {room_name} does not exist"))?;
243 room.client_rooms
244 .remove(&identity)
245 .with_context(|| format!("participant {identity:?} did not join room {room_name:?}"))?;
246 Ok(())
247 }
248
249 async fn update_participant(
250 &self,
251 room_name: String,
252 identity: String,
253 permission: proto::ParticipantPermission,
254 ) -> Result<()> {
255 self.simulate_random_delay().await;
256
257 let mut server_rooms = self.rooms.lock();
258 let room = server_rooms
259 .get_mut(&room_name)
260 .with_context(|| format!("room {room_name} does not exist"))?;
261 room.participant_permissions
262 .insert(ParticipantIdentity(identity), permission);
263 Ok(())
264 }
265
266 pub async fn disconnect_client(&self, client_identity: String) {
267 let client_identity = ParticipantIdentity(client_identity);
268
269 self.simulate_random_delay().await;
270
271 let mut server_rooms = self.rooms.lock();
272 for room in server_rooms.values_mut() {
273 if let Some(room) = room.client_rooms.remove(&client_identity) {
274 let mut room = room.0.lock();
275 room.connection_state = ConnectionState::Disconnected;
276 room.updates_tx
277 .blocking_send(RoomEvent::Disconnected {
278 reason: "SIGNAL_CLOSED",
279 })
280 .ok();
281 }
282 }
283 }
284
285 pub(crate) async fn publish_video_track(
286 &self,
287 token: String,
288 _local_track: LocalVideoTrack,
289 ) -> Result<TrackSid> {
290 self.simulate_random_delay().await;
291
292 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
293 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
294 let room_name = claims.video.room.unwrap();
295
296 let mut server_rooms = self.rooms.lock();
297 let room = server_rooms
298 .get_mut(&*room_name)
299 .with_context(|| format!("room {room_name} does not exist"))?;
300
301 let can_publish = room
302 .participant_permissions
303 .get(&identity)
304 .map(|permission| permission.can_publish)
305 .or(claims.video.can_publish)
306 .unwrap_or(true);
307
308 anyhow::ensure!(can_publish, "user is not allowed to publish");
309
310 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
311 let server_track = Arc::new(TestServerVideoTrack {
312 sid: sid.clone(),
313 publisher_id: identity.clone(),
314 });
315
316 room.video_tracks.push(server_track.clone());
317
318 for (room_identity, client_room) in &room.client_rooms {
319 if *room_identity != identity {
320 let track = RemoteTrack::Video(RemoteVideoTrack {
321 server_track: server_track.clone(),
322 _room: client_room.downgrade(),
323 });
324 let publication = RemoteTrackPublication {
325 sid: sid.clone(),
326 room: client_room.downgrade(),
327 track: track.clone(),
328 };
329 let participant = RemoteParticipant {
330 identity: identity.clone(),
331 room: client_room.downgrade(),
332 };
333 client_room
334 .0
335 .lock()
336 .updates_tx
337 .blocking_send(RoomEvent::TrackSubscribed {
338 track,
339 publication,
340 participant,
341 })
342 .unwrap();
343 }
344 }
345
346 Ok(sid)
347 }
348
349 pub(crate) async fn publish_audio_track(
350 &self,
351 token: String,
352 _local_track: &LocalAudioTrack,
353 ) -> Result<TrackSid> {
354 self.simulate_random_delay().await;
355
356 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
357 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
358 let room_name = claims.video.room.unwrap();
359
360 let mut server_rooms = self.rooms.lock();
361 let room = server_rooms
362 .get_mut(&*room_name)
363 .with_context(|| format!("room {room_name} does not exist"))?;
364
365 let can_publish = room
366 .participant_permissions
367 .get(&identity)
368 .map(|permission| permission.can_publish)
369 .or(claims.video.can_publish)
370 .unwrap_or(true);
371
372 anyhow::ensure!(can_publish, "user is not allowed to publish");
373
374 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
375 let server_track = Arc::new(TestServerAudioTrack {
376 sid: sid.clone(),
377 publisher_id: identity.clone(),
378 muted: AtomicBool::new(false),
379 });
380
381 room.audio_tracks.push(server_track.clone());
382
383 for (room_identity, client_room) in &room.client_rooms {
384 if *room_identity != identity {
385 let track = RemoteTrack::Audio(RemoteAudioTrack {
386 server_track: server_track.clone(),
387 room: client_room.downgrade(),
388 });
389 let publication = RemoteTrackPublication {
390 sid: sid.clone(),
391 room: client_room.downgrade(),
392 track: track.clone(),
393 };
394 let participant = RemoteParticipant {
395 identity: identity.clone(),
396 room: client_room.downgrade(),
397 };
398 client_room
399 .0
400 .lock()
401 .updates_tx
402 .blocking_send(RoomEvent::TrackSubscribed {
403 track,
404 publication,
405 participant,
406 })
407 .ok();
408 }
409 }
410
411 Ok(sid)
412 }
413
414 pub(crate) async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
415 Ok(())
416 }
417
418 pub(crate) fn set_track_muted(
419 &self,
420 token: &str,
421 track_sid: &TrackSid,
422 muted: bool,
423 ) -> Result<()> {
424 let claims = livekit_api::token::validate(token, &self.secret_key)?;
425 let room_name = claims.video.room.unwrap();
426 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
427 let mut server_rooms = self.rooms.lock();
428 let room = server_rooms
429 .get_mut(&*room_name)
430 .with_context(|| format!("room {room_name} does not exist"))?;
431 if let Some(track) = room
432 .audio_tracks
433 .iter_mut()
434 .find(|track| track.sid == *track_sid)
435 {
436 track.muted.store(muted, SeqCst);
437 for (id, client_room) in room.client_rooms.iter() {
438 if *id != identity {
439 let participant = Participant::Remote(RemoteParticipant {
440 identity: identity.clone(),
441 room: client_room.downgrade(),
442 });
443 let track = RemoteTrack::Audio(RemoteAudioTrack {
444 server_track: track.clone(),
445 room: client_room.downgrade(),
446 });
447 let publication = TrackPublication::Remote(RemoteTrackPublication {
448 sid: track_sid.clone(),
449 room: client_room.downgrade(),
450 track,
451 });
452
453 let event = if muted {
454 RoomEvent::TrackMuted {
455 participant,
456 publication,
457 }
458 } else {
459 RoomEvent::TrackUnmuted {
460 participant,
461 publication,
462 }
463 };
464
465 client_room
466 .0
467 .lock()
468 .updates_tx
469 .blocking_send(event)
470 .unwrap();
471 }
472 }
473 }
474 Ok(())
475 }
476
477 pub(crate) fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
478 let claims = livekit_api::token::validate(token, &self.secret_key).ok()?;
479 let room_name = claims.video.room.unwrap();
480
481 let mut server_rooms = self.rooms.lock();
482 let room = server_rooms.get_mut(&*room_name)?;
483 room.audio_tracks.iter().find_map(|track| {
484 if track.sid == *track_sid {
485 Some(track.muted.load(SeqCst))
486 } else {
487 None
488 }
489 })
490 }
491
492 pub(crate) fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
493 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
494 let room_name = claims.video.room.unwrap();
495 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
496
497 let mut server_rooms = self.rooms.lock();
498 let room = server_rooms
499 .get_mut(&*room_name)
500 .with_context(|| format!("room {room_name} does not exist"))?;
501 let client_room = room
502 .client_rooms
503 .get(&identity)
504 .context("not a participant in room")?;
505 Ok(room
506 .video_tracks
507 .iter()
508 .map(|track| RemoteVideoTrack {
509 server_track: track.clone(),
510 _room: client_room.downgrade(),
511 })
512 .collect())
513 }
514
515 pub(crate) fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
516 let claims = livekit_api::token::validate(&token, &self.secret_key)?;
517 let room_name = claims.video.room.unwrap();
518 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
519
520 let mut server_rooms = self.rooms.lock();
521 let room = server_rooms
522 .get_mut(&*room_name)
523 .with_context(|| format!("room {room_name} does not exist"))?;
524 let client_room = room
525 .client_rooms
526 .get(&identity)
527 .context("not a participant in room")?;
528 Ok(room
529 .audio_tracks
530 .iter()
531 .map(|track| RemoteAudioTrack {
532 server_track: track.clone(),
533 room: client_room.downgrade(),
534 })
535 .collect())
536 }
537
538 async fn simulate_random_delay(&self) {
539 #[cfg(any(test, feature = "test-support"))]
540 self.executor.simulate_random_delay().await;
541 }
542}
543
544#[derive(Default, Debug)]
545struct TestServerRoom {
546 client_rooms: HashMap<ParticipantIdentity, Room>,
547 video_tracks: Vec<Arc<TestServerVideoTrack>>,
548 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
549 participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
550}
551
552#[derive(Debug)]
553pub(crate) struct TestServerVideoTrack {
554 pub(crate) sid: TrackSid,
555 pub(crate) publisher_id: ParticipantIdentity,
556 // frames_rx: async_broadcast::Receiver<Frame>,
557}
558
559#[derive(Debug)]
560pub(crate) struct TestServerAudioTrack {
561 pub(crate) sid: TrackSid,
562 pub(crate) publisher_id: ParticipantIdentity,
563 pub(crate) muted: AtomicBool,
564}
565
566pub struct TestApiClient {
567 url: String,
568}
569
570#[async_trait]
571impl livekit_api::Client for TestApiClient {
572 fn url(&self) -> &str {
573 &self.url
574 }
575
576 async fn create_room(&self, name: String) -> Result<()> {
577 let server = TestServer::get(&self.url)?;
578 server.create_room(name).await?;
579 Ok(())
580 }
581
582 async fn delete_room(&self, name: String) -> Result<()> {
583 let server = TestServer::get(&self.url)?;
584 server.delete_room(name).await?;
585 Ok(())
586 }
587
588 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
589 let server = TestServer::get(&self.url)?;
590 server
591 .remove_participant(room, ParticipantIdentity(identity))
592 .await?;
593 Ok(())
594 }
595
596 async fn update_participant(
597 &self,
598 room: String,
599 identity: String,
600 permission: livekit_api::proto::ParticipantPermission,
601 ) -> Result<()> {
602 let server = TestServer::get(&self.url)?;
603 server
604 .update_participant(room, identity, permission)
605 .await?;
606 Ok(())
607 }
608
609 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
610 let server = TestServer::get(&self.url)?;
611 token::create(
612 &server.api_key,
613 &server.secret_key,
614 Some(identity),
615 token::VideoGrant::to_join(room),
616 )
617 }
618
619 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
620 let server = TestServer::get(&self.url)?;
621 token::create(
622 &server.api_key,
623 &server.secret_key,
624 Some(identity),
625 token::VideoGrant::for_guest(room),
626 )
627 }
628}
629
630pub(crate) struct RoomState {
631 pub(crate) url: String,
632 pub(crate) token: String,
633 pub(crate) local_identity: ParticipantIdentity,
634 pub(crate) connection_state: ConnectionState,
635 pub(crate) paused_audio_tracks: HashSet<TrackSid>,
636 pub(crate) updates_tx: mpsc::Sender<RoomEvent>,
637}
638
639#[derive(Clone, Debug)]
640pub struct Room(pub(crate) Arc<Mutex<RoomState>>);
641
642#[derive(Clone, Debug)]
643pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
644
645impl std::fmt::Debug for RoomState {
646 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647 f.debug_struct("Room")
648 .field("url", &self.url)
649 .field("token", &self.token)
650 .field("local_identity", &self.local_identity)
651 .field("connection_state", &self.connection_state)
652 .field("paused_audio_tracks", &self.paused_audio_tracks)
653 .finish()
654 }
655}
656
657impl Room {
658 pub(crate) fn downgrade(&self) -> WeakRoom {
659 WeakRoom(Arc::downgrade(&self.0))
660 }
661
662 pub fn connection_state(&self) -> ConnectionState {
663 self.0.lock().connection_state
664 }
665
666 pub fn local_participant(&self) -> LocalParticipant {
667 let identity = self.0.lock().local_identity.clone();
668 LocalParticipant {
669 identity,
670 room: self.clone(),
671 }
672 }
673
674 pub async fn connect(
675 url: String,
676 token: String,
677 _cx: &mut AsyncApp,
678 ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
679 let server = TestServer::get(&url)?;
680 let (updates_tx, updates_rx) = mpsc::channel(1024);
681 let this = Self(Arc::new(Mutex::new(RoomState {
682 local_identity: ParticipantIdentity(String::new()),
683 url: url.to_string(),
684 token: token.to_string(),
685 connection_state: ConnectionState::Disconnected,
686 paused_audio_tracks: Default::default(),
687 updates_tx,
688 })));
689
690 let identity = server
691 .join_room(token.to_string(), this.clone())
692 .await
693 .context("room join")?;
694 {
695 let mut state = this.0.lock();
696 state.local_identity = identity;
697 state.connection_state = ConnectionState::Connected;
698 }
699
700 Ok((this, updates_rx))
701 }
702
703 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
704 self.test_server()
705 .remote_participants(self.0.lock().token.clone())
706 .unwrap()
707 }
708
709 pub(crate) fn test_server(&self) -> Arc<TestServer> {
710 TestServer::get(&self.0.lock().url).unwrap()
711 }
712
713 pub(crate) fn token(&self) -> String {
714 self.0.lock().token.clone()
715 }
716
717 pub fn play_remote_audio_track(
718 &self,
719 _track: &RemoteAudioTrack,
720 _cx: &App,
721 ) -> anyhow::Result<AudioStream> {
722 Ok(AudioStream {})
723 }
724
725 pub async fn unpublish_local_track(&self, sid: TrackSid, cx: &mut AsyncApp) -> Result<()> {
726 self.local_participant().unpublish_track(sid, cx).await
727 }
728
729 pub async fn publish_local_microphone_track(
730 &self,
731 cx: &mut AsyncApp,
732 ) -> Result<(LocalTrackPublication, AudioStream)> {
733 self.local_participant().publish_microphone_track(cx).await
734 }
735}
736
737impl Drop for RoomState {
738 fn drop(&mut self) {
739 if self.connection_state == ConnectionState::Connected
740 && let Ok(server) = TestServer::get(&self.url)
741 {
742 let executor = server.executor.clone();
743 let token = self.token.clone();
744 executor
745 .spawn(async move { server.leave_room(token).await.ok() })
746 .detach();
747 }
748 }
749}
750
751impl WeakRoom {
752 pub(crate) fn upgrade(&self) -> Option<Room> {
753 self.0.upgrade().map(Room)
754 }
755}