1pub mod participant;
2pub mod publication;
3pub mod track;
4pub mod webrtc;
5
6use self::id::*;
7use self::{participant::*, publication::*, track::*};
8use anyhow::{anyhow, Context, Result};
9use async_trait::async_trait;
10use collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap, HashSet};
11use gpui::BackgroundExecutor;
12use livekit::options::TrackPublishOptions;
13use livekit_server::{proto, token};
14use parking_lot::Mutex;
15use postage::{mpsc, sink::Sink};
16use std::sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc, Weak,
19};
20
21pub use livekit::{id, options, ConnectionState, DisconnectReason, RoomOptions};
22
23static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
24
25pub struct TestServer {
26 pub url: String,
27 pub api_key: String,
28 pub secret_key: String,
29 rooms: Mutex<HashMap<String, TestServerRoom>>,
30 executor: BackgroundExecutor,
31}
32
33impl TestServer {
34 pub fn create(
35 url: String,
36 api_key: String,
37 secret_key: String,
38 executor: BackgroundExecutor,
39 ) -> Result<Arc<TestServer>> {
40 let mut servers = SERVERS.lock();
41 if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
42 let server = Arc::new(TestServer {
43 url,
44 api_key,
45 secret_key,
46 rooms: Default::default(),
47 executor,
48 });
49 e.insert(server.clone());
50 Ok(server)
51 } else {
52 Err(anyhow!("a server with url {:?} already exists", url))
53 }
54 }
55
56 fn get(url: &str) -> Result<Arc<TestServer>> {
57 Ok(SERVERS
58 .lock()
59 .get(url)
60 .ok_or_else(|| anyhow!("no server found for url"))?
61 .clone())
62 }
63
64 pub fn teardown(&self) -> Result<()> {
65 SERVERS
66 .lock()
67 .remove(&self.url)
68 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
69 Ok(())
70 }
71
72 pub fn create_api_client(&self) -> TestApiClient {
73 TestApiClient {
74 url: self.url.clone(),
75 }
76 }
77
78 pub async fn create_room(&self, room: String) -> Result<()> {
79 self.executor.simulate_random_delay().await;
80
81 let mut server_rooms = self.rooms.lock();
82 if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
83 e.insert(Default::default());
84 Ok(())
85 } else {
86 Err(anyhow!("room {:?} already exists", room))
87 }
88 }
89
90 async fn delete_room(&self, room: String) -> Result<()> {
91 self.executor.simulate_random_delay().await;
92
93 let mut server_rooms = self.rooms.lock();
94 server_rooms
95 .remove(&room)
96 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
97 Ok(())
98 }
99
100 async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
101 self.executor.simulate_random_delay().await;
102
103 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
104 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
105 let room_name = claims.video.room.unwrap();
106 let mut server_rooms = self.rooms.lock();
107 let room = (*server_rooms).entry(room_name.to_string()).or_default();
108
109 if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
110 for server_track in &room.video_tracks {
111 let track = RemoteTrack::Video(RemoteVideoTrack {
112 server_track: server_track.clone(),
113 _room: client_room.downgrade(),
114 });
115 client_room
116 .0
117 .lock()
118 .updates_tx
119 .blocking_send(RoomEvent::TrackSubscribed {
120 track: track.clone(),
121 publication: RemoteTrackPublication {
122 sid: server_track.sid.clone(),
123 room: client_room.downgrade(),
124 track,
125 },
126 participant: RemoteParticipant {
127 room: client_room.downgrade(),
128 identity: server_track.publisher_id.clone(),
129 },
130 })
131 .unwrap();
132 }
133 for server_track in &room.audio_tracks {
134 let track = RemoteTrack::Audio(RemoteAudioTrack {
135 server_track: server_track.clone(),
136 room: client_room.downgrade(),
137 });
138 client_room
139 .0
140 .lock()
141 .updates_tx
142 .blocking_send(RoomEvent::TrackSubscribed {
143 track: track.clone(),
144 publication: RemoteTrackPublication {
145 sid: server_track.sid.clone(),
146 room: client_room.downgrade(),
147 track,
148 },
149 participant: RemoteParticipant {
150 room: client_room.downgrade(),
151 identity: server_track.publisher_id.clone(),
152 },
153 })
154 .unwrap();
155 }
156 e.insert(client_room);
157 Ok(identity)
158 } else {
159 Err(anyhow!(
160 "{:?} attempted to join room {:?} twice",
161 identity,
162 room_name
163 ))
164 }
165 }
166
167 async fn leave_room(&self, token: String) -> Result<()> {
168 self.executor.simulate_random_delay().await;
169
170 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
171 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
172 let room_name = claims.video.room.unwrap();
173 let mut server_rooms = self.rooms.lock();
174 let room = server_rooms
175 .get_mut(&*room_name)
176 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
177 room.client_rooms.remove(&identity).ok_or_else(|| {
178 anyhow!(
179 "{:?} attempted to leave room {:?} before joining it",
180 identity,
181 room_name
182 )
183 })?;
184 Ok(())
185 }
186
187 fn remote_participants(
188 &self,
189 token: String,
190 ) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
191 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
192 let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
193 let room_name = claims.video.room.unwrap().to_string();
194
195 if let Some(server_room) = self.rooms.lock().get(&room_name) {
196 let room = server_room
197 .client_rooms
198 .get(&local_identity)
199 .unwrap()
200 .downgrade();
201 Ok(server_room
202 .client_rooms
203 .iter()
204 .filter(|(identity, _)| *identity != &local_identity)
205 .map(|(identity, _)| {
206 (
207 identity.clone(),
208 RemoteParticipant {
209 room: room.clone(),
210 identity: identity.clone(),
211 },
212 )
213 })
214 .collect())
215 } else {
216 Ok(Default::default())
217 }
218 }
219
220 async fn remove_participant(
221 &self,
222 room_name: String,
223 identity: ParticipantIdentity,
224 ) -> Result<()> {
225 self.executor.simulate_random_delay().await;
226
227 let mut server_rooms = self.rooms.lock();
228 let room = server_rooms
229 .get_mut(&room_name)
230 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
231 room.client_rooms.remove(&identity).ok_or_else(|| {
232 anyhow!(
233 "participant {:?} did not join room {:?}",
234 identity,
235 room_name
236 )
237 })?;
238 Ok(())
239 }
240
241 async fn update_participant(
242 &self,
243 room_name: String,
244 identity: String,
245 permission: proto::ParticipantPermission,
246 ) -> Result<()> {
247 self.executor.simulate_random_delay().await;
248
249 let mut server_rooms = self.rooms.lock();
250 let room = server_rooms
251 .get_mut(&room_name)
252 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
253 room.participant_permissions
254 .insert(ParticipantIdentity(identity), permission);
255 Ok(())
256 }
257
258 pub async fn disconnect_client(&self, client_identity: String) {
259 let client_identity = ParticipantIdentity(client_identity);
260
261 self.executor.simulate_random_delay().await;
262
263 let mut server_rooms = self.rooms.lock();
264 for room in server_rooms.values_mut() {
265 if let Some(room) = room.client_rooms.remove(&client_identity) {
266 let mut room = room.0.lock();
267 room.connection_state = ConnectionState::Disconnected;
268 room.updates_tx
269 .blocking_send(RoomEvent::Disconnected {
270 reason: DisconnectReason::SignalClose,
271 })
272 .ok();
273 }
274 }
275 }
276
277 async fn publish_video_track(
278 &self,
279 token: String,
280 _local_track: LocalVideoTrack,
281 ) -> Result<TrackSid> {
282 self.executor.simulate_random_delay().await;
283
284 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
285 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
286 let room_name = claims.video.room.unwrap();
287
288 let mut server_rooms = self.rooms.lock();
289 let room = server_rooms
290 .get_mut(&*room_name)
291 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
292
293 let can_publish = room
294 .participant_permissions
295 .get(&identity)
296 .map(|permission| permission.can_publish)
297 .or(claims.video.can_publish)
298 .unwrap_or(true);
299
300 if !can_publish {
301 return Err(anyhow!("user is not allowed to publish"));
302 }
303
304 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
305 let server_track = Arc::new(TestServerVideoTrack {
306 sid: sid.clone(),
307 publisher_id: identity.clone(),
308 });
309
310 room.video_tracks.push(server_track.clone());
311
312 for (room_identity, client_room) in &room.client_rooms {
313 if *room_identity != identity {
314 let track = RemoteTrack::Video(RemoteVideoTrack {
315 server_track: server_track.clone(),
316 _room: client_room.downgrade(),
317 });
318 let publication = RemoteTrackPublication {
319 sid: sid.clone(),
320 room: client_room.downgrade(),
321 track: track.clone(),
322 };
323 let participant = RemoteParticipant {
324 identity: identity.clone(),
325 room: client_room.downgrade(),
326 };
327 client_room
328 .0
329 .lock()
330 .updates_tx
331 .blocking_send(RoomEvent::TrackSubscribed {
332 track,
333 publication,
334 participant,
335 })
336 .unwrap();
337 }
338 }
339
340 Ok(sid)
341 }
342
343 async fn publish_audio_track(
344 &self,
345 token: String,
346 _local_track: &LocalAudioTrack,
347 ) -> Result<TrackSid> {
348 self.executor.simulate_random_delay().await;
349
350 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
351 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
352 let room_name = claims.video.room.unwrap();
353
354 let mut server_rooms = self.rooms.lock();
355 let room = server_rooms
356 .get_mut(&*room_name)
357 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
358
359 let can_publish = room
360 .participant_permissions
361 .get(&identity)
362 .map(|permission| permission.can_publish)
363 .or(claims.video.can_publish)
364 .unwrap_or(true);
365
366 if !can_publish {
367 return Err(anyhow!("user is not allowed to publish"));
368 }
369
370 let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
371 let server_track = Arc::new(TestServerAudioTrack {
372 sid: sid.clone(),
373 publisher_id: identity.clone(),
374 muted: AtomicBool::new(false),
375 });
376
377 room.audio_tracks.push(server_track.clone());
378
379 for (room_identity, client_room) in &room.client_rooms {
380 if *room_identity != identity {
381 let track = RemoteTrack::Audio(RemoteAudioTrack {
382 server_track: server_track.clone(),
383 room: client_room.downgrade(),
384 });
385 let publication = RemoteTrackPublication {
386 sid: sid.clone(),
387 room: client_room.downgrade(),
388 track: track.clone(),
389 };
390 let participant = RemoteParticipant {
391 identity: identity.clone(),
392 room: client_room.downgrade(),
393 };
394 client_room
395 .0
396 .lock()
397 .updates_tx
398 .blocking_send(RoomEvent::TrackSubscribed {
399 track,
400 publication,
401 participant,
402 })
403 .ok();
404 }
405 }
406
407 Ok(sid)
408 }
409
410 async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
411 Ok(())
412 }
413
414 fn set_track_muted(&self, token: &str, track_sid: &TrackSid, muted: bool) -> Result<()> {
415 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
416 let room_name = claims.video.room.unwrap();
417 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
418 let mut server_rooms = self.rooms.lock();
419 let room = server_rooms
420 .get_mut(&*room_name)
421 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
422 if let Some(track) = room
423 .audio_tracks
424 .iter_mut()
425 .find(|track| track.sid == *track_sid)
426 {
427 track.muted.store(muted, SeqCst);
428 for (id, client_room) in room.client_rooms.iter() {
429 if *id != identity {
430 let participant = Participant::Remote(RemoteParticipant {
431 identity: identity.clone(),
432 room: client_room.downgrade(),
433 });
434 let track = RemoteTrack::Audio(RemoteAudioTrack {
435 server_track: track.clone(),
436 room: client_room.downgrade(),
437 });
438 let publication = TrackPublication::Remote(RemoteTrackPublication {
439 sid: track_sid.clone(),
440 room: client_room.downgrade(),
441 track,
442 });
443
444 let event = if muted {
445 RoomEvent::TrackMuted {
446 participant,
447 publication,
448 }
449 } else {
450 RoomEvent::TrackUnmuted {
451 participant,
452 publication,
453 }
454 };
455
456 client_room
457 .0
458 .lock()
459 .updates_tx
460 .blocking_send(event)
461 .unwrap();
462 }
463 }
464 }
465 Ok(())
466 }
467
468 fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
469 let claims = livekit_server::token::validate(&token, &self.secret_key).ok()?;
470 let room_name = claims.video.room.unwrap();
471
472 let mut server_rooms = self.rooms.lock();
473 let room = server_rooms.get_mut(&*room_name)?;
474 room.audio_tracks.iter().find_map(|track| {
475 if track.sid == *track_sid {
476 Some(track.muted.load(SeqCst))
477 } else {
478 None
479 }
480 })
481 }
482
483 fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
484 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
485 let room_name = claims.video.room.unwrap();
486 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
487
488 let mut server_rooms = self.rooms.lock();
489 let room = server_rooms
490 .get_mut(&*room_name)
491 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
492 let client_room = room
493 .client_rooms
494 .get(&identity)
495 .ok_or_else(|| anyhow!("not a participant in room"))?;
496 Ok(room
497 .video_tracks
498 .iter()
499 .map(|track| RemoteVideoTrack {
500 server_track: track.clone(),
501 _room: client_room.downgrade(),
502 })
503 .collect())
504 }
505
506 fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
507 let claims = livekit_server::token::validate(&token, &self.secret_key)?;
508 let room_name = claims.video.room.unwrap();
509 let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
510
511 let mut server_rooms = self.rooms.lock();
512 let room = server_rooms
513 .get_mut(&*room_name)
514 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
515 let client_room = room
516 .client_rooms
517 .get(&identity)
518 .ok_or_else(|| anyhow!("not a participant in room"))?;
519 Ok(room
520 .audio_tracks
521 .iter()
522 .map(|track| RemoteAudioTrack {
523 server_track: track.clone(),
524 room: client_room.downgrade(),
525 })
526 .collect())
527 }
528}
529
530#[derive(Default, Debug)]
531struct TestServerRoom {
532 client_rooms: HashMap<ParticipantIdentity, Room>,
533 video_tracks: Vec<Arc<TestServerVideoTrack>>,
534 audio_tracks: Vec<Arc<TestServerAudioTrack>>,
535 participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
536}
537
538#[derive(Debug)]
539struct TestServerVideoTrack {
540 sid: TrackSid,
541 publisher_id: ParticipantIdentity,
542 // frames_rx: async_broadcast::Receiver<Frame>,
543}
544
545#[derive(Debug)]
546struct TestServerAudioTrack {
547 sid: TrackSid,
548 publisher_id: ParticipantIdentity,
549 muted: AtomicBool,
550}
551
552pub struct TestApiClient {
553 url: String,
554}
555
556#[derive(Clone, Debug)]
557#[non_exhaustive]
558pub enum RoomEvent {
559 ParticipantConnected(RemoteParticipant),
560 ParticipantDisconnected(RemoteParticipant),
561 LocalTrackPublished {
562 publication: LocalTrackPublication,
563 track: LocalTrack,
564 participant: LocalParticipant,
565 },
566 LocalTrackUnpublished {
567 publication: LocalTrackPublication,
568 participant: LocalParticipant,
569 },
570 TrackSubscribed {
571 track: RemoteTrack,
572 publication: RemoteTrackPublication,
573 participant: RemoteParticipant,
574 },
575 TrackUnsubscribed {
576 track: RemoteTrack,
577 publication: RemoteTrackPublication,
578 participant: RemoteParticipant,
579 },
580 TrackSubscriptionFailed {
581 participant: RemoteParticipant,
582 error: String,
583 track_sid: TrackSid,
584 },
585 TrackPublished {
586 publication: RemoteTrackPublication,
587 participant: RemoteParticipant,
588 },
589 TrackUnpublished {
590 publication: RemoteTrackPublication,
591 participant: RemoteParticipant,
592 },
593 TrackMuted {
594 participant: Participant,
595 publication: TrackPublication,
596 },
597 TrackUnmuted {
598 participant: Participant,
599 publication: TrackPublication,
600 },
601 RoomMetadataChanged {
602 old_metadata: String,
603 metadata: String,
604 },
605 ParticipantMetadataChanged {
606 participant: Participant,
607 old_metadata: String,
608 metadata: String,
609 },
610 ParticipantNameChanged {
611 participant: Participant,
612 old_name: String,
613 name: String,
614 },
615 ActiveSpeakersChanged {
616 speakers: Vec<Participant>,
617 },
618 ConnectionStateChanged(ConnectionState),
619 Connected {
620 participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
621 },
622 Disconnected {
623 reason: DisconnectReason,
624 },
625 Reconnecting,
626 Reconnected,
627}
628
629#[async_trait]
630impl livekit_server::api::Client for TestApiClient {
631 fn url(&self) -> &str {
632 &self.url
633 }
634
635 async fn create_room(&self, name: String) -> Result<()> {
636 let server = TestServer::get(&self.url)?;
637 server.create_room(name).await?;
638 Ok(())
639 }
640
641 async fn delete_room(&self, name: String) -> Result<()> {
642 let server = TestServer::get(&self.url)?;
643 server.delete_room(name).await?;
644 Ok(())
645 }
646
647 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
648 let server = TestServer::get(&self.url)?;
649 server
650 .remove_participant(room, ParticipantIdentity(identity))
651 .await?;
652 Ok(())
653 }
654
655 async fn update_participant(
656 &self,
657 room: String,
658 identity: String,
659 permission: livekit_server::proto::ParticipantPermission,
660 ) -> Result<()> {
661 let server = TestServer::get(&self.url)?;
662 server
663 .update_participant(room, identity, permission)
664 .await?;
665 Ok(())
666 }
667
668 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
669 let server = TestServer::get(&self.url)?;
670 token::create(
671 &server.api_key,
672 &server.secret_key,
673 Some(identity),
674 token::VideoGrant::to_join(room),
675 )
676 }
677
678 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
679 let server = TestServer::get(&self.url)?;
680 token::create(
681 &server.api_key,
682 &server.secret_key,
683 Some(identity),
684 token::VideoGrant::for_guest(room),
685 )
686 }
687}
688
689struct RoomState {
690 url: String,
691 token: String,
692 local_identity: ParticipantIdentity,
693 connection_state: ConnectionState,
694 paused_audio_tracks: HashSet<TrackSid>,
695 updates_tx: mpsc::Sender<RoomEvent>,
696}
697
698#[derive(Clone, Debug)]
699pub struct Room(Arc<Mutex<RoomState>>);
700
701#[derive(Clone, Debug)]
702pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
703
704impl std::fmt::Debug for RoomState {
705 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
706 f.debug_struct("Room")
707 .field("url", &self.url)
708 .field("token", &self.token)
709 .field("local_identity", &self.local_identity)
710 .field("connection_state", &self.connection_state)
711 .field("paused_audio_tracks", &self.paused_audio_tracks)
712 .finish()
713 }
714}
715
716impl Room {
717 fn downgrade(&self) -> WeakRoom {
718 WeakRoom(Arc::downgrade(&self.0))
719 }
720
721 pub fn connection_state(&self) -> ConnectionState {
722 self.0.lock().connection_state
723 }
724
725 pub fn local_participant(&self) -> LocalParticipant {
726 let identity = self.0.lock().local_identity.clone();
727 LocalParticipant {
728 identity,
729 room: self.clone(),
730 }
731 }
732
733 pub async fn connect(
734 url: &str,
735 token: &str,
736 _options: RoomOptions,
737 ) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
738 let server = TestServer::get(&url)?;
739 let (updates_tx, updates_rx) = mpsc::channel(1024);
740 let this = Self(Arc::new(Mutex::new(RoomState {
741 local_identity: ParticipantIdentity(String::new()),
742 url: url.to_string(),
743 token: token.to_string(),
744 connection_state: ConnectionState::Disconnected,
745 paused_audio_tracks: Default::default(),
746 updates_tx,
747 })));
748
749 let identity = server
750 .join_room(token.to_string(), this.clone())
751 .await
752 .context("room join")?;
753 {
754 let mut state = this.0.lock();
755 state.local_identity = identity;
756 state.connection_state = ConnectionState::Connected;
757 }
758
759 Ok((this, updates_rx))
760 }
761
762 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
763 self.test_server()
764 .remote_participants(self.0.lock().token.clone())
765 .unwrap()
766 }
767
768 fn test_server(&self) -> Arc<TestServer> {
769 TestServer::get(&self.0.lock().url).unwrap()
770 }
771
772 fn token(&self) -> String {
773 self.0.lock().token.clone()
774 }
775}
776
777impl Drop for RoomState {
778 fn drop(&mut self) {
779 if self.connection_state == ConnectionState::Connected {
780 if let Ok(server) = TestServer::get(&self.url) {
781 let executor = server.executor.clone();
782 let token = self.token.clone();
783 executor
784 .spawn(async move { server.leave_room(token).await.ok() })
785 .detach();
786 }
787 }
788 }
789}
790
791impl WeakRoom {
792 fn upgrade(&self) -> Option<Room> {
793 self.0.upgrade().map(Room)
794 }
795}