test.rs

  1use anyhow::{anyhow, Result};
  2use async_trait::async_trait;
  3use collections::HashMap;
  4use futures::Stream;
  5use gpui::executor::Background;
  6use lazy_static::lazy_static;
  7use live_kit_server::token;
  8use media::core_video::CVImageBuffer;
  9use parking_lot::Mutex;
 10use std::{future::Future, sync::Arc};
 11
 12lazy_static! {
 13    static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
 14}
 15
 16pub struct TestServer {
 17    pub url: String,
 18    pub api_key: String,
 19    pub secret_key: String,
 20    rooms: Mutex<HashMap<String, TestServerRoom>>,
 21    background: Arc<Background>,
 22}
 23
 24impl TestServer {
 25    pub fn create(
 26        url: String,
 27        api_key: String,
 28        secret_key: String,
 29        background: Arc<Background>,
 30    ) -> Result<Arc<TestServer>> {
 31        let mut servers = SERVERS.lock();
 32        if servers.contains_key(&url) {
 33            Err(anyhow!("a server with url {:?} already exists", url))
 34        } else {
 35            let server = Arc::new(TestServer {
 36                url: url.clone(),
 37                api_key,
 38                secret_key,
 39                rooms: Default::default(),
 40                background,
 41            });
 42            servers.insert(url, server.clone());
 43            Ok(server)
 44        }
 45    }
 46
 47    fn get(url: &str) -> Result<Arc<TestServer>> {
 48        Ok(SERVERS
 49            .lock()
 50            .get(url)
 51            .ok_or_else(|| anyhow!("no server found for url"))?
 52            .clone())
 53    }
 54
 55    pub fn teardown(&self) -> Result<()> {
 56        SERVERS
 57            .lock()
 58            .remove(&self.url)
 59            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
 60        Ok(())
 61    }
 62
 63    pub fn create_api_client(&self) -> TestApiClient {
 64        TestApiClient {
 65            url: self.url.clone(),
 66        }
 67    }
 68
 69    async fn create_room(&self, room: String) -> Result<()> {
 70        self.background.simulate_random_delay().await;
 71        let mut server_rooms = self.rooms.lock();
 72        if server_rooms.contains_key(&room) {
 73            Err(anyhow!("room {:?} already exists", room))
 74        } else {
 75            server_rooms.insert(room, Default::default());
 76            Ok(())
 77        }
 78    }
 79
 80    async fn delete_room(&self, room: String) -> Result<()> {
 81        // TODO: clear state associated with all `Room`s.
 82        self.background.simulate_random_delay().await;
 83        let mut server_rooms = self.rooms.lock();
 84        server_rooms
 85            .remove(&room)
 86            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
 87        Ok(())
 88    }
 89
 90    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
 91        self.background.simulate_random_delay().await;
 92        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
 93        let identity = claims.sub.unwrap().to_string();
 94        let room_name = claims.video.room.unwrap();
 95        let mut server_rooms = self.rooms.lock();
 96        let room = server_rooms
 97            .get_mut(&*room_name)
 98            .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
 99        if room.client_rooms.contains_key(&identity) {
100            Err(anyhow!(
101                "{:?} attempted to join room {:?} twice",
102                identity,
103                room_name
104            ))
105        } else {
106            room.client_rooms.insert(identity, client_room);
107            Ok(())
108        }
109    }
110
111    async fn leave_room(&self, token: String) -> Result<()> {
112        self.background.simulate_random_delay().await;
113        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
114        let identity = claims.sub.unwrap().to_string();
115        let room_name = claims.video.room.unwrap();
116        let mut server_rooms = self.rooms.lock();
117        let room = server_rooms
118            .get_mut(&*room_name)
119            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
120        room.client_rooms.remove(&identity).ok_or_else(|| {
121            anyhow!(
122                "{:?} attempted to leave room {:?} before joining it",
123                identity,
124                room_name
125            )
126        })?;
127        Ok(())
128    }
129
130    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
131        // TODO: clear state associated with the `Room`.
132
133        self.background.simulate_random_delay().await;
134        let mut server_rooms = self.rooms.lock();
135        let room = server_rooms
136            .get_mut(&room_name)
137            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
138        room.client_rooms.remove(&identity).ok_or_else(|| {
139            anyhow!(
140                "participant {:?} did not join room {:?}",
141                identity,
142                room_name
143            )
144        })?;
145        Ok(())
146    }
147
148    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
149        self.background.simulate_random_delay().await;
150        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
151        let identity = claims.sub.unwrap().to_string();
152        let room_name = claims.video.room.unwrap();
153
154        let mut server_rooms = self.rooms.lock();
155        let room = server_rooms
156            .get_mut(&*room_name)
157            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
158
159        let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
160            sid: nanoid::nanoid!(17),
161            publisher_id: identity.clone(),
162            frames_rx: local_track.frames_rx.clone(),
163        }));
164
165        for (id, client_room) in &room.client_rooms {
166            if *id != identity {
167                let _ = client_room
168                    .0
169                    .lock()
170                    .video_track_updates
171                    .0
172                    .try_broadcast(update.clone())
173                    .unwrap();
174            }
175        }
176
177        Ok(())
178    }
179}
180
181#[derive(Default)]
182struct TestServerRoom {
183    client_rooms: HashMap<Sid, Arc<Room>>,
184}
185
186impl TestServerRoom {}
187
188pub struct TestApiClient {
189    url: String,
190}
191
192#[async_trait]
193impl live_kit_server::api::Client for TestApiClient {
194    fn url(&self) -> &str {
195        &self.url
196    }
197
198    async fn create_room(&self, name: String) -> Result<()> {
199        let server = TestServer::get(&self.url)?;
200        server.create_room(name).await?;
201        Ok(())
202    }
203
204    async fn delete_room(&self, name: String) -> Result<()> {
205        let server = TestServer::get(&self.url)?;
206        server.delete_room(name).await?;
207        Ok(())
208    }
209
210    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
211        let server = TestServer::get(&self.url)?;
212        server.remove_participant(room, identity).await?;
213        Ok(())
214    }
215
216    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
217        let server = TestServer::get(&self.url)?;
218        token::create(
219            &server.api_key,
220            &server.secret_key,
221            Some(identity),
222            token::VideoGrant::to_join(room),
223        )
224    }
225}
226
227pub type Sid = String;
228
229struct RoomState {
230    connection: Option<ConnectionState>,
231    display_sources: Vec<MacOSDisplay>,
232    video_track_updates: (
233        async_broadcast::Sender<RemoteVideoTrackUpdate>,
234        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
235    ),
236}
237
238struct ConnectionState {
239    url: String,
240    token: String,
241}
242
243pub struct Room(Mutex<RoomState>);
244
245impl Room {
246    pub fn new() -> Arc<Self> {
247        Arc::new(Self(Mutex::new(RoomState {
248            connection: None,
249            display_sources: Default::default(),
250            video_track_updates: async_broadcast::broadcast(128),
251        })))
252    }
253
254    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
255        let this = self.clone();
256        let url = url.to_string();
257        let token = token.to_string();
258        async move {
259            let server = TestServer::get(&url)?;
260            server.join_room(token.clone(), this.clone()).await?;
261            this.0.lock().connection = Some(ConnectionState { url, token });
262            Ok(())
263        }
264    }
265
266    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
267        let this = self.clone();
268        async move {
269            let server = this.test_server();
270            server.background.simulate_random_delay().await;
271            Ok(this.0.lock().display_sources.clone())
272        }
273    }
274
275    pub fn publish_video_track(
276        self: &Arc<Self>,
277        track: &LocalVideoTrack,
278    ) -> impl Future<Output = Result<LocalTrackPublication>> {
279        let this = self.clone();
280        let track = track.clone();
281        async move {
282            this.test_server()
283                .publish_video_track(this.token(), track)
284                .await?;
285            Ok(LocalTrackPublication)
286        }
287    }
288
289    pub fn unpublish_track(&self, _: LocalTrackPublication) {}
290
291    pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
292        Default::default()
293    }
294
295    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
296        self.0.lock().video_track_updates.1.clone()
297    }
298
299    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
300        self.0.lock().display_sources = sources;
301    }
302
303    fn test_server(&self) -> Arc<TestServer> {
304        let this = self.0.lock();
305        let connection = this
306            .connection
307            .as_ref()
308            .expect("must be connected to call this method");
309        TestServer::get(&connection.url).unwrap()
310    }
311
312    fn token(&self) -> String {
313        self.0
314            .lock()
315            .connection
316            .as_ref()
317            .expect("must be connected to call this method")
318            .token
319            .clone()
320    }
321}
322
323impl Drop for Room {
324    fn drop(&mut self) {
325        if let Some(connection) = self.0.lock().connection.take() {
326            if let Ok(server) = TestServer::get(&connection.token) {
327                let background = server.background.clone();
328                background
329                    .spawn(async move { server.leave_room(connection.token).await.unwrap() })
330                    .detach();
331            }
332        }
333    }
334}
335
336pub struct LocalTrackPublication;
337
338#[derive(Clone)]
339pub struct LocalVideoTrack {
340    frames_rx: async_broadcast::Receiver<Frame>,
341}
342
343impl LocalVideoTrack {
344    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
345        Self {
346            frames_rx: display.frames.1.clone(),
347        }
348    }
349}
350
351pub struct RemoteVideoTrack {
352    sid: Sid,
353    publisher_id: Sid,
354    frames_rx: async_broadcast::Receiver<Frame>,
355}
356
357impl RemoteVideoTrack {
358    pub fn sid(&self) -> &str {
359        &self.sid
360    }
361
362    pub fn publisher_id(&self) -> &str {
363        &self.publisher_id
364    }
365
366    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
367        self.frames_rx.clone()
368    }
369}
370
371#[derive(Clone)]
372pub enum RemoteVideoTrackUpdate {
373    Subscribed(Arc<RemoteVideoTrack>),
374    Unsubscribed { publisher_id: Sid, track_id: Sid },
375}
376
377#[derive(Clone)]
378pub struct MacOSDisplay {
379    frames: (
380        async_broadcast::Sender<Frame>,
381        async_broadcast::Receiver<Frame>,
382    ),
383}
384
385impl MacOSDisplay {
386    pub fn new() -> Self {
387        Self {
388            frames: async_broadcast::broadcast(128),
389        }
390    }
391
392    pub fn send_frame(&self, frame: Frame) {
393        self.frames.0.try_broadcast(frame).unwrap();
394    }
395}
396
397#[derive(Clone, Debug, PartialEq, Eq)]
398pub struct Frame {
399    pub label: String,
400    pub width: usize,
401    pub height: usize,
402}
403
404impl Frame {
405    pub fn width(&self) -> usize {
406        self.width
407    }
408
409    pub fn height(&self) -> usize {
410        self.height
411    }
412
413    pub fn image(&self) -> CVImageBuffer {
414        unimplemented!("you can't call this in test mode")
415    }
416}