mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use crate::call_settings::CallSettings;
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
  8use collections::HashSet;
  9use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 10use gpui::{
 11    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, Subscription,
 12    Task, WeakModel,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19
 20pub use livekit_client::{
 21    track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
 22};
 23pub use participant::ParticipantLocation;
 24pub use room::Room;
 25
 26struct GlobalActiveCall(Model<ActiveCall>);
 27
 28impl Global for GlobalActiveCall {}
 29
 30pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 31    livekit_client::init(
 32        cx.background_executor().dispatcher.clone(),
 33        cx.http_client(),
 34    );
 35    CallSettings::register(cx);
 36
 37    let active_call = cx.new_model(|cx| ActiveCall::new(client, user_store, cx));
 38    cx.set_global(GlobalActiveCall(active_call));
 39}
 40
 41pub struct OneAtATime {
 42    cancel: Option<oneshot::Sender<()>>,
 43}
 44
 45impl OneAtATime {
 46    /// spawn a task in the given context.
 47    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 48    /// otherwise you'll see the result of the task.
 49    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 50    where
 51        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 52        Fut: Future<Output = Result<R>>,
 53        R: 'static,
 54    {
 55        let (tx, rx) = oneshot::channel();
 56        self.cancel.replace(tx);
 57        cx.spawn(|cx| async move {
 58            futures::select_biased! {
 59                _ = rx.fuse() => Ok(None),
 60                result = f(cx).fuse() => result.map(Some),
 61            }
 62        })
 63    }
 64
 65    fn running(&self) -> bool {
 66        self.cancel
 67            .as_ref()
 68            .is_some_and(|cancel| !cancel.is_canceled())
 69    }
 70}
 71
 72#[derive(Clone)]
 73pub struct IncomingCall {
 74    pub room_id: u64,
 75    pub calling_user: Arc<User>,
 76    pub participants: Vec<Arc<User>>,
 77    pub initial_project: Option<proto::ParticipantProject>,
 78}
 79
 80/// Singleton global maintaining the user's participation in a room across workspaces.
 81pub struct ActiveCall {
 82    room: Option<(Model<Room>, Vec<Subscription>)>,
 83    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 84    location: Option<WeakModel<Project>>,
 85    _join_debouncer: OneAtATime,
 86    pending_invites: HashSet<u64>,
 87    incoming_call: (
 88        watch::Sender<Option<IncomingCall>>,
 89        watch::Receiver<Option<IncomingCall>>,
 90    ),
 91    client: Arc<Client>,
 92    user_store: Model<UserStore>,
 93    _subscriptions: Vec<client::Subscription>,
 94}
 95
 96impl EventEmitter<Event> for ActiveCall {}
 97
 98impl ActiveCall {
 99    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
100        Self {
101            room: None,
102            pending_room_creation: None,
103            location: None,
104            pending_invites: Default::default(),
105            incoming_call: watch::channel(),
106            _join_debouncer: OneAtATime { cancel: None },
107            _subscriptions: vec![
108                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
109                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
110            ],
111            client,
112            user_store,
113        }
114    }
115
116    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
117        self.room()?.read(cx).channel_id()
118    }
119
120    async fn handle_incoming_call(
121        this: Model<Self>,
122        envelope: TypedEnvelope<proto::IncomingCall>,
123        mut cx: AsyncAppContext,
124    ) -> Result<proto::Ack> {
125        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
126        let call = IncomingCall {
127            room_id: envelope.payload.room_id,
128            participants: user_store
129                .update(&mut cx, |user_store, cx| {
130                    user_store.get_users(envelope.payload.participant_user_ids, cx)
131                })?
132                .await?,
133            calling_user: user_store
134                .update(&mut cx, |user_store, cx| {
135                    user_store.get_user(envelope.payload.calling_user_id, cx)
136                })?
137                .await?,
138            initial_project: envelope.payload.initial_project,
139        };
140        this.update(&mut cx, |this, _| {
141            *this.incoming_call.0.borrow_mut() = Some(call);
142        })?;
143
144        Ok(proto::Ack {})
145    }
146
147    async fn handle_call_canceled(
148        this: Model<Self>,
149        envelope: TypedEnvelope<proto::CallCanceled>,
150        mut cx: AsyncAppContext,
151    ) -> Result<()> {
152        this.update(&mut cx, |this, _| {
153            let mut incoming_call = this.incoming_call.0.borrow_mut();
154            if incoming_call
155                .as_ref()
156                .map_or(false, |call| call.room_id == envelope.payload.room_id)
157            {
158                incoming_call.take();
159            }
160        })?;
161        Ok(())
162    }
163
164    pub fn global(cx: &AppContext) -> Model<Self> {
165        cx.global::<GlobalActiveCall>().0.clone()
166    }
167
168    pub fn try_global(cx: &AppContext) -> Option<Model<Self>> {
169        cx.try_global::<GlobalActiveCall>()
170            .map(|call| call.0.clone())
171    }
172
173    pub fn invite(
174        &mut self,
175        called_user_id: u64,
176        initial_project: Option<Model<Project>>,
177        cx: &mut ModelContext<Self>,
178    ) -> Task<Result<()>> {
179        if !self.pending_invites.insert(called_user_id) {
180            return Task::ready(Err(anyhow!("user was already invited")));
181        }
182        cx.notify();
183
184        if self._join_debouncer.running() {
185            return Task::ready(Ok(()));
186        }
187
188        let room = if let Some(room) = self.room().cloned() {
189            Some(Task::ready(Ok(room)).shared())
190        } else {
191            self.pending_room_creation.clone()
192        };
193
194        let invite = if let Some(room) = room {
195            cx.spawn(move |_, mut cx| async move {
196                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
197
198                let initial_project_id = if let Some(initial_project) = initial_project {
199                    Some(
200                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
201                            .await?,
202                    )
203                } else {
204                    None
205                };
206
207                room.update(&mut cx, move |room, cx| {
208                    room.call(called_user_id, initial_project_id, cx)
209                })?
210                .await?;
211
212                anyhow::Ok(())
213            })
214        } else {
215            let client = self.client.clone();
216            let user_store = self.user_store.clone();
217            let room = cx
218                .spawn(move |this, mut cx| async move {
219                    let create_room = async {
220                        let room = cx
221                            .update(|cx| {
222                                Room::create(
223                                    called_user_id,
224                                    initial_project,
225                                    client,
226                                    user_store,
227                                    cx,
228                                )
229                            })?
230                            .await?;
231
232                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
233                            .await?;
234
235                        anyhow::Ok(room)
236                    };
237
238                    let room = create_room.await;
239                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
240                    room.map_err(Arc::new)
241                })
242                .shared();
243            self.pending_room_creation = Some(room.clone());
244            cx.background_executor().spawn(async move {
245                room.await.map_err(|err| anyhow!("{:?}", err))?;
246                anyhow::Ok(())
247            })
248        };
249
250        cx.spawn(move |this, mut cx| async move {
251            let result = invite.await;
252            if result.is_ok() {
253                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
254            } else {
255                //TODO: report collaboration error
256                log::error!("invite failed: {:?}", result);
257            }
258
259            this.update(&mut cx, |this, cx| {
260                this.pending_invites.remove(&called_user_id);
261                cx.notify();
262            })?;
263            result
264        })
265    }
266
267    pub fn cancel_invite(
268        &mut self,
269        called_user_id: u64,
270        cx: &mut ModelContext<Self>,
271    ) -> Task<Result<()>> {
272        let room_id = if let Some(room) = self.room() {
273            room.read(cx).id()
274        } else {
275            return Task::ready(Err(anyhow!("no active call")));
276        };
277
278        let client = self.client.clone();
279        cx.background_executor().spawn(async move {
280            client
281                .request(proto::CancelCall {
282                    room_id,
283                    called_user_id,
284                })
285                .await?;
286            anyhow::Ok(())
287        })
288    }
289
290    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
291        self.incoming_call.1.clone()
292    }
293
294    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
295        if self.room.is_some() {
296            return Task::ready(Err(anyhow!("cannot join while on another call")));
297        }
298
299        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
300            call
301        } else {
302            return Task::ready(Err(anyhow!("no incoming call")));
303        };
304
305        if self.pending_room_creation.is_some() {
306            return Task::ready(Ok(()));
307        }
308
309        let room_id = call.room_id;
310        let client = self.client.clone();
311        let user_store = self.user_store.clone();
312        let join = self
313            ._join_debouncer
314            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
315
316        cx.spawn(|this, mut cx| async move {
317            let room = join.await?;
318            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
319                .await?;
320            this.update(&mut cx, |this, cx| {
321                this.report_call_event("accept incoming", cx)
322            })?;
323            Ok(())
324        })
325    }
326
327    pub fn decline_incoming(&mut self, _: &mut ModelContext<Self>) -> Result<()> {
328        let call = self
329            .incoming_call
330            .0
331            .borrow_mut()
332            .take()
333            .ok_or_else(|| anyhow!("no incoming call"))?;
334        report_call_event_for_room("decline incoming", call.room_id, None, &self.client);
335        self.client.send(proto::DeclineCall {
336            room_id: call.room_id,
337        })?;
338        Ok(())
339    }
340
341    pub fn join_channel(
342        &mut self,
343        channel_id: ChannelId,
344        cx: &mut ModelContext<Self>,
345    ) -> Task<Result<Option<Model<Room>>>> {
346        if let Some(room) = self.room().cloned() {
347            if room.read(cx).channel_id() == Some(channel_id) {
348                return Task::ready(Ok(Some(room)));
349            } else {
350                room.update(cx, |room, cx| room.clear_state(cx));
351            }
352        }
353
354        if self.pending_room_creation.is_some() {
355            return Task::ready(Ok(None));
356        }
357
358        let client = self.client.clone();
359        let user_store = self.user_store.clone();
360        let join = self._join_debouncer.spawn(cx, move |cx| async move {
361            Room::join_channel(channel_id, client, user_store, cx).await
362        });
363
364        cx.spawn(|this, mut cx| async move {
365            let room = join.await?;
366            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
367                .await?;
368            this.update(&mut cx, |this, cx| {
369                this.report_call_event("join channel", cx)
370            })?;
371            Ok(room)
372        })
373    }
374
375    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
376        cx.notify();
377        self.report_call_event("hang up", cx);
378
379        Audio::end_call(cx);
380
381        let channel_id = self.channel_id(cx);
382        if let Some((room, _)) = self.room.take() {
383            cx.emit(Event::RoomLeft { channel_id });
384            room.update(cx, |room, cx| room.leave(cx))
385        } else {
386            Task::ready(Ok(()))
387        }
388    }
389
390    pub fn share_project(
391        &mut self,
392        project: Model<Project>,
393        cx: &mut ModelContext<Self>,
394    ) -> Task<Result<u64>> {
395        if let Some((room, _)) = self.room.as_ref() {
396            self.report_call_event("share project", cx);
397            room.update(cx, |room, cx| room.share_project(project, cx))
398        } else {
399            Task::ready(Err(anyhow!("no active call")))
400        }
401    }
402
403    pub fn unshare_project(
404        &mut self,
405        project: Model<Project>,
406        cx: &mut ModelContext<Self>,
407    ) -> Result<()> {
408        if let Some((room, _)) = self.room.as_ref() {
409            self.report_call_event("unshare project", cx);
410            room.update(cx, |room, cx| room.unshare_project(project, cx))
411        } else {
412            Err(anyhow!("no active call"))
413        }
414    }
415
416    pub fn location(&self) -> Option<&WeakModel<Project>> {
417        self.location.as_ref()
418    }
419
420    pub fn set_location(
421        &mut self,
422        project: Option<&Model<Project>>,
423        cx: &mut ModelContext<Self>,
424    ) -> Task<Result<()>> {
425        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
426            self.location = project.map(|project| project.downgrade());
427            if let Some((room, _)) = self.room.as_ref() {
428                return room.update(cx, |room, cx| room.set_location(project, cx));
429            }
430        }
431        Task::ready(Ok(()))
432    }
433
434    fn set_room(
435        &mut self,
436        room: Option<Model<Room>>,
437        cx: &mut ModelContext<Self>,
438    ) -> Task<Result<()>> {
439        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
440            Task::ready(Ok(()))
441        } else {
442            cx.notify();
443            if let Some(room) = room {
444                if room.read(cx).status().is_offline() {
445                    self.room = None;
446                    Task::ready(Ok(()))
447                } else {
448                    let subscriptions = vec![
449                        cx.observe(&room, |this, room, cx| {
450                            if room.read(cx).status().is_offline() {
451                                this.set_room(None, cx).detach_and_log_err(cx);
452                            }
453
454                            cx.notify();
455                        }),
456                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
457                    ];
458                    self.room = Some((room.clone(), subscriptions));
459                    let location = self
460                        .location
461                        .as_ref()
462                        .and_then(|location| location.upgrade());
463                    let channel_id = room.read(cx).channel_id();
464                    cx.emit(Event::RoomJoined { channel_id });
465                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
466                }
467            } else {
468                self.room = None;
469                Task::ready(Ok(()))
470            }
471        }
472    }
473
474    pub fn room(&self) -> Option<&Model<Room>> {
475        self.room.as_ref().map(|(room, _)| room)
476    }
477
478    pub fn client(&self) -> Arc<Client> {
479        self.client.clone()
480    }
481
482    pub fn pending_invites(&self) -> &HashSet<u64> {
483        &self.pending_invites
484    }
485
486    pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
487        if let Some(room) = self.room() {
488            let room = room.read(cx);
489            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client);
490        }
491    }
492}
493
494pub fn report_call_event_for_room(
495    operation: &'static str,
496    room_id: u64,
497    channel_id: Option<ChannelId>,
498    client: &Arc<Client>,
499) {
500    let telemetry = client.telemetry();
501
502    telemetry.report_call_event(operation, Some(room_id), channel_id)
503}
504
505pub fn report_call_event_for_channel(
506    operation: &'static str,
507    channel_id: ChannelId,
508    client: &Arc<Client>,
509    cx: &AppContext,
510) {
511    let room = ActiveCall::global(cx).read(cx).room();
512
513    let telemetry = client.telemetry();
514
515    telemetry.report_call_event(operation, room.map(|r| r.read(cx).id()), Some(channel_id))
516}
517
518#[cfg(test)]
519mod test {
520    use gpui::TestAppContext;
521
522    use crate::OneAtATime;
523
524    #[gpui::test]
525    async fn test_one_at_a_time(cx: &mut TestAppContext) {
526        let mut one_at_a_time = OneAtATime { cancel: None };
527
528        assert_eq!(
529            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
530                .await
531                .unwrap(),
532            Some(1)
533        );
534
535        let (a, b) = cx.update(|cx| {
536            (
537                one_at_a_time.spawn(cx, |_| async {
538                    panic!("");
539                }),
540                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
541            )
542        });
543
544        assert_eq!(a.await.unwrap(), None::<u32>);
545        assert_eq!(b.await.unwrap(), Some(3));
546
547        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
548        drop(one_at_a_time);
549
550        assert_eq!(promise.await.unwrap(), None);
551    }
552}