test_app.rs

  1use std::sync::Arc;
  2
  3use futures::StreamExt;
  4use gpui::{
  5    AppContext as _, AsyncApp, Bounds, Context, Entity, InteractiveElement, KeyBinding, Menu,
  6    MenuItem, ParentElement, Pixels, Render, ScreenCaptureStream, SharedString,
  7    StatefulInteractiveElement as _, Styled, Task, Window, WindowBounds, WindowHandle,
  8    WindowOptions, actions, bounds, div, point,
  9    prelude::{FluentBuilder as _, IntoElement},
 10    px, rgb, size,
 11};
 12use livekit_client::{
 13    AudioStream, LocalTrackPublication, Participant, ParticipantIdentity, RemoteParticipant,
 14    RemoteTrackPublication, RemoteVideoTrack, RemoteVideoTrackView, Room, RoomEvent,
 15};
 16
 17use livekit_api::token::{self, VideoGrant};
 18use log::LevelFilter;
 19use simplelog::SimpleLogger;
 20
 21actions!(livekit_client, [Quit]);
 22
 23fn main() {
 24    SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
 25
 26    gpui::Application::new().run(|cx| {
 27        #[cfg(any(test, feature = "test-support"))]
 28        println!("USING TEST LIVEKIT");
 29
 30        #[cfg(not(any(test, feature = "test-support")))]
 31        println!("USING REAL LIVEKIT");
 32
 33        gpui_tokio::init(cx);
 34
 35        cx.activate(true);
 36        cx.on_action(quit);
 37        cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]);
 38        cx.set_menus(vec![Menu {
 39            name: "Zed".into(),
 40            items: vec![MenuItem::Action {
 41                name: "Quit".into(),
 42                action: Box::new(Quit),
 43                os_action: None,
 44            }],
 45        }]);
 46
 47        let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or("http://localhost:7880".into());
 48        let livekit_key = std::env::var("LIVEKIT_KEY").unwrap_or("devkey".into());
 49        let livekit_secret = std::env::var("LIVEKIT_SECRET").unwrap_or("secret".into());
 50        let height = px(800.);
 51        let width = px(800.);
 52
 53        cx.spawn(async move |cx| {
 54            let mut windows = Vec::new();
 55            for i in 0..2 {
 56                let token = token::create(
 57                    &livekit_key,
 58                    &livekit_secret,
 59                    Some(&format!("test-participant-{i}")),
 60                    VideoGrant::to_join("wtej-trty"),
 61                )
 62                .unwrap();
 63
 64                let bounds = bounds(point(width * i, px(0.0)), size(width, height));
 65                let window = LivekitWindow::new(livekit_url.clone(), token, bounds, cx).await;
 66                windows.push(window);
 67            }
 68        })
 69        .detach();
 70    });
 71}
 72
 73fn quit(_: &Quit, cx: &mut gpui::App) {
 74    cx.quit();
 75}
 76
 77struct LivekitWindow {
 78    room: Arc<livekit_client::Room>,
 79    microphone_track: Option<LocalTrackPublication>,
 80    screen_share_track: Option<LocalTrackPublication>,
 81    microphone_stream: Option<livekit_client::AudioStream>,
 82    screen_share_stream: Option<Box<dyn ScreenCaptureStream>>,
 83    remote_participants: Vec<(ParticipantIdentity, ParticipantState)>,
 84    _events_task: Task<()>,
 85}
 86
 87#[derive(Default)]
 88struct ParticipantState {
 89    audio_output_stream: Option<(RemoteTrackPublication, AudioStream)>,
 90    muted: bool,
 91    screen_share_output_view: Option<(RemoteVideoTrack, Entity<RemoteVideoTrackView>)>,
 92    speaking: bool,
 93}
 94
 95impl LivekitWindow {
 96    async fn new(
 97        url: String,
 98        token: String,
 99        bounds: Bounds<Pixels>,
100        cx: &mut AsyncApp,
101    ) -> WindowHandle<Self> {
102        let (room, mut events) =
103            Room::connect(url.clone(), token, cx)
104                .await
105                .unwrap_or_else(|err| {
106                    eprintln!(
107                        "Failed to connect to {url}: {err}.\nTry `foreman start` to run the livekit server"
108                    );
109
110                    std::process::exit(1)
111                });
112
113        cx.update(|cx| {
114            cx.open_window(
115                WindowOptions {
116                    window_bounds: Some(WindowBounds::Windowed(bounds)),
117                    ..Default::default()
118                },
119                |window, cx| {
120                    cx.new(|cx| {
121                        let _events_task = cx.spawn_in(window, async move |this, cx| {
122                            while let Some(event) = events.next().await {
123                                cx.update(|window, cx| {
124                                    this.update(cx, |this: &mut LivekitWindow, cx| {
125                                        this.handle_room_event(event, window, cx)
126                                    })
127                                })
128                                .ok();
129                            }
130                        });
131
132                        Self {
133                            room: Arc::new(room),
134                            microphone_track: None,
135                            microphone_stream: None,
136                            screen_share_track: None,
137                            screen_share_stream: None,
138                            remote_participants: Vec::new(),
139                            _events_task,
140                        }
141                    })
142                },
143            )
144            .unwrap()
145        })
146        .unwrap()
147    }
148
149    fn handle_room_event(&mut self, event: RoomEvent, window: &mut Window, cx: &mut Context<Self>) {
150        eprintln!("event: {event:?}");
151
152        match event {
153            RoomEvent::TrackUnpublished {
154                publication,
155                participant,
156            } => {
157                let output = self.remote_participant(participant);
158                let unpublish_sid = publication.sid();
159                if output
160                    .audio_output_stream
161                    .as_ref()
162                    .is_some_and(|(track, _)| track.sid() == unpublish_sid)
163                {
164                    output.audio_output_stream.take();
165                }
166                if output
167                    .screen_share_output_view
168                    .as_ref()
169                    .is_some_and(|(track, _)| track.sid() == unpublish_sid)
170                {
171                    output.screen_share_output_view.take();
172                }
173                cx.notify();
174            }
175
176            RoomEvent::TrackSubscribed {
177                publication,
178                participant,
179                track,
180            } => {
181                let room = self.room.clone();
182                let output = self.remote_participant(participant);
183                match track {
184                    livekit_client::RemoteTrack::Audio(track) => {
185                        output.audio_output_stream = Some((
186                            publication,
187                            room.play_remote_audio_track(&track, cx).unwrap(),
188                        ));
189                    }
190                    livekit_client::RemoteTrack::Video(track) => {
191                        output.screen_share_output_view = Some((
192                            track.clone(),
193                            cx.new(|cx| RemoteVideoTrackView::new(track, window, cx)),
194                        ));
195                    }
196                }
197                cx.notify();
198            }
199
200            RoomEvent::TrackMuted { participant, .. } => {
201                if let Participant::Remote(participant) = participant {
202                    self.remote_participant(participant).muted = true;
203                    cx.notify();
204                }
205            }
206
207            RoomEvent::TrackUnmuted { participant, .. } => {
208                if let Participant::Remote(participant) = participant {
209                    self.remote_participant(participant).muted = false;
210                    cx.notify();
211                }
212            }
213
214            RoomEvent::ActiveSpeakersChanged { speakers } => {
215                for (identity, output) in &mut self.remote_participants {
216                    output.speaking = speakers.iter().any(|speaker| {
217                        if let Participant::Remote(speaker) = speaker {
218                            speaker.identity() == *identity
219                        } else {
220                            false
221                        }
222                    });
223                }
224                cx.notify();
225            }
226
227            _ => {}
228        }
229
230        cx.notify();
231    }
232
233    fn remote_participant(&mut self, participant: RemoteParticipant) -> &mut ParticipantState {
234        match self
235            .remote_participants
236            .binary_search_by_key(&&participant.identity(), |row| &row.0)
237        {
238            Ok(ix) => &mut self.remote_participants[ix].1,
239            Err(ix) => {
240                self.remote_participants
241                    .insert(ix, (participant.identity(), ParticipantState::default()));
242                &mut self.remote_participants[ix].1
243            }
244        }
245    }
246
247    fn toggle_mute(&mut self, window: &mut Window, cx: &mut Context<Self>) {
248        if let Some(track) = &self.microphone_track {
249            if track.is_muted() {
250                track.unmute(cx);
251            } else {
252                track.mute(cx);
253            }
254            cx.notify();
255        } else {
256            let room = self.room.clone();
257            cx.spawn_in(window, async move |this, cx| {
258                let (publication, stream) = room.publish_local_microphone_track(cx).await.unwrap();
259                this.update(cx, |this, cx| {
260                    this.microphone_track = Some(publication);
261                    this.microphone_stream = Some(stream);
262                    cx.notify();
263                })
264            })
265            .detach();
266        }
267    }
268
269    fn toggle_screen_share(&mut self, window: &mut Window, cx: &mut Context<Self>) {
270        if let Some(track) = self.screen_share_track.take() {
271            self.screen_share_stream.take();
272            let participant = self.room.local_participant();
273            cx.spawn(async move |_, cx| {
274                participant.unpublish_track(track.sid(), cx).await.unwrap();
275            })
276            .detach();
277            cx.notify();
278        } else {
279            let participant = self.room.local_participant();
280            let sources = cx.screen_capture_sources();
281            cx.spawn_in(window, async move |this, cx| {
282                let sources = sources.await.unwrap()?;
283                let source = sources.into_iter().next().unwrap();
284
285                let (publication, stream) = participant
286                    .publish_screenshare_track(&*source, cx)
287                    .await
288                    .unwrap();
289                this.update(cx, |this, cx| {
290                    this.screen_share_track = Some(publication);
291                    this.screen_share_stream = Some(stream);
292                    cx.notify();
293                })
294            })
295            .detach();
296        }
297    }
298
299    fn toggle_remote_audio_for_participant(
300        &mut self,
301        identity: &ParticipantIdentity,
302        cx: &mut Context<Self>,
303    ) -> Option<()> {
304        let participant = self.remote_participants.iter().find_map(|(id, state)| {
305            if id == identity { Some(state) } else { None }
306        })?;
307        let publication = &participant.audio_output_stream.as_ref()?.0;
308        publication.set_enabled(!publication.is_enabled(), cx);
309        cx.notify();
310        Some(())
311    }
312}
313
314impl Render for LivekitWindow {
315    fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
316        fn button() -> gpui::Div {
317            div()
318                .w(px(180.0))
319                .h(px(30.0))
320                .px_2()
321                .m_2()
322                .bg(rgb(0x8888ff))
323        }
324
325        div()
326            .bg(rgb(0xffffff))
327            .size_full()
328            .flex()
329            .flex_col()
330            .child(
331                div().bg(rgb(0xffd4a8)).flex().flex_row().children([
332                    button()
333                        .id("toggle-mute")
334                        .child(if let Some(track) = &self.microphone_track {
335                            if track.is_muted() { "Unmute" } else { "Mute" }
336                        } else {
337                            "Publish mic"
338                        })
339                        .on_click(cx.listener(|this, _, window, cx| this.toggle_mute(window, cx))),
340                    button()
341                        .id("toggle-screen-share")
342                        .child(if self.screen_share_track.is_none() {
343                            "Share screen"
344                        } else {
345                            "Unshare screen"
346                        })
347                        .on_click(
348                            cx.listener(|this, _, window, cx| this.toggle_screen_share(window, cx)),
349                        ),
350                ]),
351            )
352            .child(
353                div()
354                    .id("remote-participants")
355                    .overflow_y_scroll()
356                    .flex()
357                    .flex_col()
358                    .flex_grow()
359                    .children(self.remote_participants.iter().map(|(identity, state)| {
360                        div()
361                            .h(px(1080.0))
362                            .flex()
363                            .flex_col()
364                            .m_2()
365                            .px_2()
366                            .bg(rgb(0x8888ff))
367                            .child(SharedString::from(if state.speaking {
368                                format!("{} (speaking)", &identity.0)
369                            } else if state.muted {
370                                format!("{} (muted)", &identity.0)
371                            } else {
372                                identity.0.clone()
373                            }))
374                            .when_some(state.audio_output_stream.as_ref(), |el, state| {
375                                el.child(
376                                    button()
377                                        .id(SharedString::from(identity.0.clone()))
378                                        .child(if state.0.is_enabled() {
379                                            "Deafen"
380                                        } else {
381                                            "Undeafen"
382                                        })
383                                        .on_click(cx.listener({
384                                            let identity = identity.clone();
385                                            move |this, _, _, cx| {
386                                                this.toggle_remote_audio_for_participant(
387                                                    &identity, cx,
388                                                );
389                                            }
390                                        })),
391                                )
392                            })
393                            .children(state.screen_share_output_view.as_ref().map(|e| e.1.clone()))
394                    })),
395            )
396    }
397}