test_app.rs

  1use std::sync::Arc;
  2
  3use futures::StreamExt;
  4use gpui::{
  5    AppContext as _, AsyncApp, Bounds, Context, Entity, InteractiveElement, KeyBinding, Menu,
  6    MenuItem, ParentElement, Pixels, Render, ScreenCaptureStream, SharedString,
  7    StatefulInteractiveElement as _, Styled, Task, Window, WindowBounds, WindowHandle,
  8    WindowOptions, actions, bounds, div, point,
  9    prelude::{FluentBuilder as _, IntoElement},
 10    px, rgb, size,
 11};
 12use livekit_client::{
 13    AudioStream, LocalTrackPublication, Participant, ParticipantIdentity, RemoteParticipant,
 14    RemoteTrackPublication, RemoteVideoTrack, RemoteVideoTrackView, Room, RoomEvent,
 15};
 16
 17use livekit_api::token::{self, VideoGrant};
 18use log::LevelFilter;
 19use simplelog::SimpleLogger;
 20
 21actions!(livekit_client, [Quit]);
 22
 23fn main() {
 24    SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
 25
 26    gpui_platform::application().run(|cx| {
 27        #[cfg(any(test, feature = "test-support"))]
 28        println!("USING TEST LIVEKIT");
 29
 30        #[cfg(not(any(test, feature = "test-support")))]
 31        println!("USING REAL LIVEKIT");
 32
 33        gpui_tokio::init(cx);
 34
 35        cx.activate(true);
 36        cx.on_action(quit);
 37        cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]);
 38        cx.set_menus([Menu::new("Zed").items([MenuItem::action("Quit", Quit)])]);
 39
 40        let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or("http://localhost:7880".into());
 41        let livekit_key = std::env::var("LIVEKIT_KEY").unwrap_or("devkey".into());
 42        let livekit_secret = std::env::var("LIVEKIT_SECRET").unwrap_or("secret".into());
 43        let height = px(800.);
 44        let width = px(800.);
 45
 46        cx.spawn(async move |cx| {
 47            let mut windows = Vec::new();
 48            for i in 0..2 {
 49                let token = token::create(
 50                    &livekit_key,
 51                    &livekit_secret,
 52                    Some(&format!("test-participant-{i}")),
 53                    VideoGrant::to_join("wtej-trty"),
 54                )
 55                .unwrap();
 56
 57                let bounds = bounds(point(width * i, px(0.0)), size(width, height));
 58                let window = LivekitWindow::new(livekit_url.clone(), token, bounds, cx).await;
 59                windows.push(window);
 60            }
 61        })
 62        .detach();
 63    });
 64}
 65
 66fn quit(_: &Quit, cx: &mut gpui::App) {
 67    cx.quit();
 68}
 69
 70struct LivekitWindow {
 71    room: Arc<livekit_client::Room>,
 72    microphone_track: Option<LocalTrackPublication>,
 73    screen_share_track: Option<LocalTrackPublication>,
 74    microphone_stream: Option<livekit_client::AudioStream>,
 75    screen_share_stream: Option<Box<dyn ScreenCaptureStream>>,
 76    remote_participants: Vec<(ParticipantIdentity, ParticipantState)>,
 77    _events_task: Task<()>,
 78}
 79
 80#[derive(Default)]
 81struct ParticipantState {
 82    audio_output_stream: Option<(RemoteTrackPublication, AudioStream)>,
 83    muted: bool,
 84    screen_share_output_view: Option<(RemoteVideoTrack, Entity<RemoteVideoTrackView>)>,
 85    speaking: bool,
 86}
 87
 88impl LivekitWindow {
 89    async fn new(
 90        url: String,
 91        token: String,
 92        bounds: Bounds<Pixels>,
 93        cx: &mut AsyncApp,
 94    ) -> WindowHandle<Self> {
 95        let (room, mut events) =
 96            Room::connect(url.clone(), token, cx)
 97                .await
 98                .unwrap_or_else(|err| {
 99                    eprintln!(
100                        "Failed to connect to {url}: {err}.\nTry `foreman start` to run the livekit server"
101                    );
102
103                    std::process::exit(1)
104                });
105
106        cx.update(|cx| {
107            cx.open_window(
108                WindowOptions {
109                    window_bounds: Some(WindowBounds::Windowed(bounds)),
110                    ..Default::default()
111                },
112                |window, cx| {
113                    cx.new(|cx| {
114                        let _events_task = cx.spawn_in(window, async move |this, cx| {
115                            while let Some(event) = events.next().await {
116                                cx.update(|window, cx| {
117                                    this.update(cx, |this: &mut LivekitWindow, cx| {
118                                        this.handle_room_event(event, window, cx)
119                                    })
120                                })
121                                .ok();
122                            }
123                        });
124
125                        Self {
126                            room: Arc::new(room),
127                            microphone_track: None,
128                            microphone_stream: None,
129                            screen_share_track: None,
130                            screen_share_stream: None,
131                            remote_participants: Vec::new(),
132                            _events_task,
133                        }
134                    })
135                },
136            )
137            .unwrap()
138        })
139    }
140
141    fn handle_room_event(&mut self, event: RoomEvent, window: &mut Window, cx: &mut Context<Self>) {
142        eprintln!("event: {event:?}");
143
144        match event {
145            RoomEvent::TrackUnpublished {
146                publication,
147                participant,
148            } => {
149                let output = self.remote_participant(participant);
150                let unpublish_sid = publication.sid();
151                if output
152                    .audio_output_stream
153                    .as_ref()
154                    .is_some_and(|(track, _)| track.sid() == unpublish_sid)
155                {
156                    output.audio_output_stream.take();
157                }
158                if output
159                    .screen_share_output_view
160                    .as_ref()
161                    .is_some_and(|(track, _)| track.sid() == unpublish_sid)
162                {
163                    output.screen_share_output_view.take();
164                }
165                cx.notify();
166            }
167
168            RoomEvent::TrackSubscribed {
169                publication,
170                participant,
171                track,
172            } => {
173                let room = self.room.clone();
174                let output = self.remote_participant(participant);
175                match track {
176                    livekit_client::RemoteTrack::Audio(track) => {
177                        output.audio_output_stream = Some((
178                            publication,
179                            room.play_remote_audio_track(&track, cx).unwrap(),
180                        ));
181                    }
182                    livekit_client::RemoteTrack::Video(track) => {
183                        output.screen_share_output_view = Some((
184                            track.clone(),
185                            cx.new(|cx| RemoteVideoTrackView::new(track, window, cx)),
186                        ));
187                    }
188                }
189                cx.notify();
190            }
191
192            RoomEvent::TrackMuted { participant, .. } => {
193                if let Participant::Remote(participant) = participant {
194                    self.remote_participant(participant).muted = true;
195                    cx.notify();
196                }
197            }
198
199            RoomEvent::TrackUnmuted { participant, .. } => {
200                if let Participant::Remote(participant) = participant {
201                    self.remote_participant(participant).muted = false;
202                    cx.notify();
203                }
204            }
205
206            RoomEvent::ActiveSpeakersChanged { speakers } => {
207                for (identity, output) in &mut self.remote_participants {
208                    output.speaking = speakers.iter().any(|speaker| {
209                        if let Participant::Remote(speaker) = speaker {
210                            speaker.identity() == *identity
211                        } else {
212                            false
213                        }
214                    });
215                }
216                cx.notify();
217            }
218
219            _ => {}
220        }
221
222        cx.notify();
223    }
224
225    fn remote_participant(&mut self, participant: RemoteParticipant) -> &mut ParticipantState {
226        match self
227            .remote_participants
228            .binary_search_by_key(&&participant.identity(), |row| &row.0)
229        {
230            Ok(ix) => &mut self.remote_participants[ix].1,
231            Err(ix) => {
232                self.remote_participants
233                    .insert(ix, (participant.identity(), ParticipantState::default()));
234                &mut self.remote_participants[ix].1
235            }
236        }
237    }
238
239    fn toggle_mute(&mut self, window: &mut Window, cx: &mut Context<Self>) {
240        if let Some(track) = &self.microphone_track {
241            if track.is_muted() {
242                track.unmute(cx);
243            } else {
244                track.mute(cx);
245            }
246            cx.notify();
247        } else {
248            let room = self.room.clone();
249            cx.spawn_in(window, async move |this, cx| {
250                let (publication, stream, _input_lag_us) = room
251                    .publish_local_microphone_track("test_user".to_string(), false, cx)
252                    .await
253                    .unwrap();
254                this.update(cx, |this, cx| {
255                    this.microphone_track = Some(publication);
256                    this.microphone_stream = Some(stream);
257                    cx.notify();
258                })
259            })
260            .detach();
261        }
262    }
263
264    fn toggle_screen_share(&mut self, window: &mut Window, cx: &mut Context<Self>) {
265        if let Some(track) = self.screen_share_track.take() {
266            self.screen_share_stream.take();
267            let participant = self.room.local_participant();
268            cx.spawn(async move |_, cx| {
269                participant.unpublish_track(track.sid(), cx).await.unwrap();
270            })
271            .detach();
272            cx.notify();
273        } else {
274            let participant = self.room.local_participant();
275            let sources = cx.screen_capture_sources();
276            cx.spawn_in(window, async move |this, cx| {
277                let sources = sources.await.unwrap()?;
278                let source = sources.into_iter().next().unwrap();
279
280                let (publication, stream) = participant
281                    .publish_screenshare_track(&*source, cx)
282                    .await
283                    .unwrap();
284                this.update(cx, |this, cx| {
285                    this.screen_share_track = Some(publication);
286                    this.screen_share_stream = Some(stream);
287                    cx.notify();
288                })
289            })
290            .detach();
291        }
292    }
293
294    fn toggle_remote_audio_for_participant(
295        &mut self,
296        identity: &ParticipantIdentity,
297        cx: &mut Context<Self>,
298    ) -> Option<()> {
299        let participant = self.remote_participants.iter().find_map(|(id, state)| {
300            if id == identity { Some(state) } else { None }
301        })?;
302        let publication = &participant.audio_output_stream.as_ref()?.0;
303        publication.set_enabled(!publication.is_enabled(), cx);
304        cx.notify();
305        Some(())
306    }
307}
308
309impl Render for LivekitWindow {
310    fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
311        fn button() -> gpui::Div {
312            div()
313                .w(px(180.0))
314                .h(px(30.0))
315                .px_2()
316                .m_2()
317                .bg(rgb(0x8888ff))
318        }
319
320        div()
321            .bg(rgb(0xffffff))
322            .size_full()
323            .flex()
324            .flex_col()
325            .child(
326                div().bg(rgb(0xffd4a8)).flex().flex_row().children([
327                    button()
328                        .id("toggle-mute")
329                        .child(if let Some(track) = &self.microphone_track {
330                            if track.is_muted() { "Unmute" } else { "Mute" }
331                        } else {
332                            "Publish mic"
333                        })
334                        .on_click(cx.listener(|this, _, window, cx| this.toggle_mute(window, cx))),
335                    button()
336                        .id("toggle-screen-share")
337                        .child(if self.screen_share_track.is_none() {
338                            "Share screen"
339                        } else {
340                            "Unshare screen"
341                        })
342                        .on_click(
343                            cx.listener(|this, _, window, cx| this.toggle_screen_share(window, cx)),
344                        ),
345                ]),
346            )
347            .child(
348                div()
349                    .id("remote-participants")
350                    .overflow_y_scroll()
351                    .flex()
352                    .flex_col()
353                    .flex_grow()
354                    .children(self.remote_participants.iter().map(|(identity, state)| {
355                        div()
356                            .h(px(1080.0))
357                            .flex()
358                            .flex_col()
359                            .m_2()
360                            .px_2()
361                            .bg(rgb(0x8888ff))
362                            .child(SharedString::from(if state.speaking {
363                                format!("{} (speaking)", &identity.0)
364                            } else if state.muted {
365                                format!("{} (muted)", &identity.0)
366                            } else {
367                                identity.0.clone()
368                            }))
369                            .when_some(state.audio_output_stream.as_ref(), |el, state| {
370                                el.child(
371                                    button()
372                                        .id(identity.0.clone())
373                                        .child(if state.0.is_enabled() {
374                                            "Deafen"
375                                        } else {
376                                            "Undeafen"
377                                        })
378                                        .on_click(cx.listener({
379                                            let identity = identity.clone();
380                                            move |this, _, _, cx| {
381                                                this.toggle_remote_audio_for_participant(
382                                                    &identity, cx,
383                                                );
384                                            }
385                                        })),
386                                )
387                            })
388                            .children(state.screen_share_output_view.as_ref().map(|e| e.1.clone()))
389                    })),
390            )
391    }
392}