test_app.rs

  1use std::sync::Arc;
  2
  3use futures::StreamExt;
  4use gpui::{
  5    AppContext as _, AsyncApp, Bounds, Context, Entity, InteractiveElement, KeyBinding, Menu,
  6    MenuItem, ParentElement, Pixels, Render, ScreenCaptureStream, SharedString,
  7    StatefulInteractiveElement as _, Styled, Task, Window, WindowBounds, WindowHandle,
  8    WindowOptions, actions, bounds, div, point,
  9    prelude::{FluentBuilder as _, IntoElement},
 10    px, rgb, size,
 11};
 12use livekit_client::{
 13    AudioStream, LocalTrackPublication, Participant, ParticipantIdentity, RemoteParticipant,
 14    RemoteTrackPublication, RemoteVideoTrack, RemoteVideoTrackView, Room, RoomEvent,
 15};
 16
 17use livekit_api::token::{self, VideoGrant};
 18use log::LevelFilter;
 19use simplelog::SimpleLogger;
 20
 21actions!(livekit_client, [Quit]);
 22
 23fn main() {
 24    SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
 25
 26    gpui::Application::new().run(|cx| {
 27        #[cfg(any(test, feature = "test-support"))]
 28        println!("USING TEST LIVEKIT");
 29
 30        #[cfg(not(any(test, feature = "test-support")))]
 31        println!("USING REAL LIVEKIT");
 32
 33        gpui_tokio::init(cx);
 34
 35        cx.activate(true);
 36        cx.on_action(quit);
 37        cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]);
 38        cx.set_menus(vec![Menu {
 39            name: "Zed".into(),
 40            items: vec![MenuItem::Action {
 41                name: "Quit".into(),
 42                action: Box::new(Quit),
 43                os_action: None,
 44                checked: false,
 45            }],
 46        }]);
 47
 48        let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or("http://localhost:7880".into());
 49        let livekit_key = std::env::var("LIVEKIT_KEY").unwrap_or("devkey".into());
 50        let livekit_secret = std::env::var("LIVEKIT_SECRET").unwrap_or("secret".into());
 51        let height = px(800.);
 52        let width = px(800.);
 53
 54        cx.spawn(async move |cx| {
 55            let mut windows = Vec::new();
 56            for i in 0..2 {
 57                let token = token::create(
 58                    &livekit_key,
 59                    &livekit_secret,
 60                    Some(&format!("test-participant-{i}")),
 61                    VideoGrant::to_join("wtej-trty"),
 62                )
 63                .unwrap();
 64
 65                let bounds = bounds(point(width * i, px(0.0)), size(width, height));
 66                let window = LivekitWindow::new(livekit_url.clone(), token, bounds, cx).await;
 67                windows.push(window);
 68            }
 69        })
 70        .detach();
 71    });
 72}
 73
 74fn quit(_: &Quit, cx: &mut gpui::App) {
 75    cx.quit();
 76}
 77
 78struct LivekitWindow {
 79    room: Arc<livekit_client::Room>,
 80    microphone_track: Option<LocalTrackPublication>,
 81    screen_share_track: Option<LocalTrackPublication>,
 82    microphone_stream: Option<livekit_client::AudioStream>,
 83    screen_share_stream: Option<Box<dyn ScreenCaptureStream>>,
 84    remote_participants: Vec<(ParticipantIdentity, ParticipantState)>,
 85    _events_task: Task<()>,
 86}
 87
 88#[derive(Default)]
 89struct ParticipantState {
 90    audio_output_stream: Option<(RemoteTrackPublication, AudioStream)>,
 91    muted: bool,
 92    screen_share_output_view: Option<(RemoteVideoTrack, Entity<RemoteVideoTrackView>)>,
 93    speaking: bool,
 94}
 95
 96impl LivekitWindow {
 97    async fn new(
 98        url: String,
 99        token: String,
100        bounds: Bounds<Pixels>,
101        cx: &mut AsyncApp,
102    ) -> WindowHandle<Self> {
103        let (room, mut events) =
104            Room::connect(url.clone(), token, cx)
105                .await
106                .unwrap_or_else(|err| {
107                    eprintln!(
108                        "Failed to connect to {url}: {err}.\nTry `foreman start` to run the livekit server"
109                    );
110
111                    std::process::exit(1)
112                });
113
114        cx.update(|cx| {
115            cx.open_window(
116                WindowOptions {
117                    window_bounds: Some(WindowBounds::Windowed(bounds)),
118                    ..Default::default()
119                },
120                |window, cx| {
121                    cx.new(|cx| {
122                        let _events_task = cx.spawn_in(window, async move |this, cx| {
123                            while let Some(event) = events.next().await {
124                                cx.update(|window, cx| {
125                                    this.update(cx, |this: &mut LivekitWindow, cx| {
126                                        this.handle_room_event(event, window, cx)
127                                    })
128                                })
129                                .ok();
130                            }
131                        });
132
133                        Self {
134                            room: Arc::new(room),
135                            microphone_track: None,
136                            microphone_stream: None,
137                            screen_share_track: None,
138                            screen_share_stream: None,
139                            remote_participants: Vec::new(),
140                            _events_task,
141                        }
142                    })
143                },
144            )
145            .unwrap()
146        })
147        .unwrap()
148    }
149
150    fn handle_room_event(&mut self, event: RoomEvent, window: &mut Window, cx: &mut Context<Self>) {
151        eprintln!("event: {event:?}");
152
153        match event {
154            RoomEvent::TrackUnpublished {
155                publication,
156                participant,
157            } => {
158                let output = self.remote_participant(participant);
159                let unpublish_sid = publication.sid();
160                if output
161                    .audio_output_stream
162                    .as_ref()
163                    .is_some_and(|(track, _)| track.sid() == unpublish_sid)
164                {
165                    output.audio_output_stream.take();
166                }
167                if output
168                    .screen_share_output_view
169                    .as_ref()
170                    .is_some_and(|(track, _)| track.sid() == unpublish_sid)
171                {
172                    output.screen_share_output_view.take();
173                }
174                cx.notify();
175            }
176
177            RoomEvent::TrackSubscribed {
178                publication,
179                participant,
180                track,
181            } => {
182                let room = self.room.clone();
183                let output = self.remote_participant(participant);
184                match track {
185                    livekit_client::RemoteTrack::Audio(track) => {
186                        output.audio_output_stream = Some((
187                            publication,
188                            room.play_remote_audio_track(&track, cx).unwrap(),
189                        ));
190                    }
191                    livekit_client::RemoteTrack::Video(track) => {
192                        output.screen_share_output_view = Some((
193                            track.clone(),
194                            cx.new(|cx| RemoteVideoTrackView::new(track, window, cx)),
195                        ));
196                    }
197                }
198                cx.notify();
199            }
200
201            RoomEvent::TrackMuted { participant, .. } => {
202                if let Participant::Remote(participant) = participant {
203                    self.remote_participant(participant).muted = true;
204                    cx.notify();
205                }
206            }
207
208            RoomEvent::TrackUnmuted { participant, .. } => {
209                if let Participant::Remote(participant) = participant {
210                    self.remote_participant(participant).muted = false;
211                    cx.notify();
212                }
213            }
214
215            RoomEvent::ActiveSpeakersChanged { speakers } => {
216                for (identity, output) in &mut self.remote_participants {
217                    output.speaking = speakers.iter().any(|speaker| {
218                        if let Participant::Remote(speaker) = speaker {
219                            speaker.identity() == *identity
220                        } else {
221                            false
222                        }
223                    });
224                }
225                cx.notify();
226            }
227
228            _ => {}
229        }
230
231        cx.notify();
232    }
233
234    fn remote_participant(&mut self, participant: RemoteParticipant) -> &mut ParticipantState {
235        match self
236            .remote_participants
237            .binary_search_by_key(&&participant.identity(), |row| &row.0)
238        {
239            Ok(ix) => &mut self.remote_participants[ix].1,
240            Err(ix) => {
241                self.remote_participants
242                    .insert(ix, (participant.identity(), ParticipantState::default()));
243                &mut self.remote_participants[ix].1
244            }
245        }
246    }
247
248    fn toggle_mute(&mut self, window: &mut Window, cx: &mut Context<Self>) {
249        if let Some(track) = &self.microphone_track {
250            if track.is_muted() {
251                track.unmute(cx);
252            } else {
253                track.mute(cx);
254            }
255            cx.notify();
256        } else {
257            let room = self.room.clone();
258            cx.spawn_in(window, async move |this, cx| {
259                let (publication, stream) = room
260                    .publish_local_microphone_track("test_user".to_string(), false, cx)
261                    .await
262                    .unwrap();
263                this.update(cx, |this, cx| {
264                    this.microphone_track = Some(publication);
265                    this.microphone_stream = Some(stream);
266                    cx.notify();
267                })
268            })
269            .detach();
270        }
271    }
272
273    fn toggle_screen_share(&mut self, window: &mut Window, cx: &mut Context<Self>) {
274        if let Some(track) = self.screen_share_track.take() {
275            self.screen_share_stream.take();
276            let participant = self.room.local_participant();
277            cx.spawn(async move |_, cx| {
278                participant.unpublish_track(track.sid(), cx).await.unwrap();
279            })
280            .detach();
281            cx.notify();
282        } else {
283            let participant = self.room.local_participant();
284            let sources = cx.screen_capture_sources();
285            cx.spawn_in(window, async move |this, cx| {
286                let sources = sources.await.unwrap()?;
287                let source = sources.into_iter().next().unwrap();
288
289                let (publication, stream) = participant
290                    .publish_screenshare_track(&*source, cx)
291                    .await
292                    .unwrap();
293                this.update(cx, |this, cx| {
294                    this.screen_share_track = Some(publication);
295                    this.screen_share_stream = Some(stream);
296                    cx.notify();
297                })
298            })
299            .detach();
300        }
301    }
302
303    fn toggle_remote_audio_for_participant(
304        &mut self,
305        identity: &ParticipantIdentity,
306        cx: &mut Context<Self>,
307    ) -> Option<()> {
308        let participant = self.remote_participants.iter().find_map(|(id, state)| {
309            if id == identity { Some(state) } else { None }
310        })?;
311        let publication = &participant.audio_output_stream.as_ref()?.0;
312        publication.set_enabled(!publication.is_enabled(), cx);
313        cx.notify();
314        Some(())
315    }
316}
317
318impl Render for LivekitWindow {
319    fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
320        fn button() -> gpui::Div {
321            div()
322                .w(px(180.0))
323                .h(px(30.0))
324                .px_2()
325                .m_2()
326                .bg(rgb(0x8888ff))
327        }
328
329        div()
330            .bg(rgb(0xffffff))
331            .size_full()
332            .flex()
333            .flex_col()
334            .child(
335                div().bg(rgb(0xffd4a8)).flex().flex_row().children([
336                    button()
337                        .id("toggle-mute")
338                        .child(if let Some(track) = &self.microphone_track {
339                            if track.is_muted() { "Unmute" } else { "Mute" }
340                        } else {
341                            "Publish mic"
342                        })
343                        .on_click(cx.listener(|this, _, window, cx| this.toggle_mute(window, cx))),
344                    button()
345                        .id("toggle-screen-share")
346                        .child(if self.screen_share_track.is_none() {
347                            "Share screen"
348                        } else {
349                            "Unshare screen"
350                        })
351                        .on_click(
352                            cx.listener(|this, _, window, cx| this.toggle_screen_share(window, cx)),
353                        ),
354                ]),
355            )
356            .child(
357                div()
358                    .id("remote-participants")
359                    .overflow_y_scroll()
360                    .flex()
361                    .flex_col()
362                    .flex_grow()
363                    .children(self.remote_participants.iter().map(|(identity, state)| {
364                        div()
365                            .h(px(1080.0))
366                            .flex()
367                            .flex_col()
368                            .m_2()
369                            .px_2()
370                            .bg(rgb(0x8888ff))
371                            .child(SharedString::from(if state.speaking {
372                                format!("{} (speaking)", &identity.0)
373                            } else if state.muted {
374                                format!("{} (muted)", &identity.0)
375                            } else {
376                                identity.0.clone()
377                            }))
378                            .when_some(state.audio_output_stream.as_ref(), |el, state| {
379                                el.child(
380                                    button()
381                                        .id(SharedString::from(identity.0.clone()))
382                                        .child(if state.0.is_enabled() {
383                                            "Deafen"
384                                        } else {
385                                            "Undeafen"
386                                        })
387                                        .on_click(cx.listener({
388                                            let identity = identity.clone();
389                                            move |this, _, _, cx| {
390                                                this.toggle_remote_audio_for_participant(
391                                                    &identity, cx,
392                                                );
393                                            }
394                                        })),
395                                )
396                            })
397                            .children(state.screen_share_output_view.as_ref().map(|e| e.1.clone()))
398                    })),
399            )
400    }
401}