mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use crate::call_settings::CallSettings;
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
  8use collections::HashSet;
  9use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 10use gpui::{
 11    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, Subscription, Task,
 12    WeakEntity,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19
 20pub use livekit_client::{
 21    track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
 22};
 23pub use participant::ParticipantLocation;
 24pub use room::Room;
 25
 26struct GlobalActiveCall(Entity<ActiveCall>);
 27
 28impl Global for GlobalActiveCall {}
 29
 30pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 31    livekit_client::init(
 32        cx.background_executor().dispatcher.clone(),
 33        cx.http_client(),
 34    );
 35    CallSettings::register(cx);
 36
 37    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 38    cx.set_global(GlobalActiveCall(active_call));
 39}
 40
 41pub struct OneAtATime {
 42    cancel: Option<oneshot::Sender<()>>,
 43}
 44
 45impl OneAtATime {
 46    /// spawn a task in the given context.
 47    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 48    /// otherwise you'll see the result of the task.
 49    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
 50    where
 51        F: 'static + FnOnce(AsyncApp) -> Fut,
 52        Fut: Future<Output = Result<R>>,
 53        R: 'static,
 54    {
 55        let (tx, rx) = oneshot::channel();
 56        self.cancel.replace(tx);
 57        cx.spawn(|cx| async move {
 58            futures::select_biased! {
 59                _ = rx.fuse() => Ok(None),
 60                result = f(cx).fuse() => result.map(Some),
 61            }
 62        })
 63    }
 64
 65    fn running(&self) -> bool {
 66        self.cancel
 67            .as_ref()
 68            .is_some_and(|cancel| !cancel.is_canceled())
 69    }
 70}
 71
 72#[derive(Clone)]
 73pub struct IncomingCall {
 74    pub room_id: u64,
 75    pub calling_user: Arc<User>,
 76    pub participants: Vec<Arc<User>>,
 77    pub initial_project: Option<proto::ParticipantProject>,
 78}
 79
 80/// Singleton global maintaining the user's participation in a room across workspaces.
 81pub struct ActiveCall {
 82    room: Option<(Entity<Room>, Vec<Subscription>)>,
 83    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
 84    location: Option<WeakEntity<Project>>,
 85    _join_debouncer: OneAtATime,
 86    pending_invites: HashSet<u64>,
 87    incoming_call: (
 88        watch::Sender<Option<IncomingCall>>,
 89        watch::Receiver<Option<IncomingCall>>,
 90    ),
 91    client: Arc<Client>,
 92    user_store: Entity<UserStore>,
 93    _subscriptions: Vec<client::Subscription>,
 94}
 95
 96impl EventEmitter<Event> for ActiveCall {}
 97
 98impl ActiveCall {
 99    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
100        Self {
101            room: None,
102            pending_room_creation: None,
103            location: None,
104            pending_invites: Default::default(),
105            incoming_call: watch::channel(),
106            _join_debouncer: OneAtATime { cancel: None },
107            _subscriptions: vec![
108                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
109                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
110            ],
111            client,
112            user_store,
113        }
114    }
115
116    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
117        self.room()?.read(cx).channel_id()
118    }
119
120    async fn handle_incoming_call(
121        this: Entity<Self>,
122        envelope: TypedEnvelope<proto::IncomingCall>,
123        mut cx: AsyncApp,
124    ) -> Result<proto::Ack> {
125        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
126        let call = IncomingCall {
127            room_id: envelope.payload.room_id,
128            participants: user_store
129                .update(&mut cx, |user_store, cx| {
130                    user_store.get_users(envelope.payload.participant_user_ids, cx)
131                })?
132                .await?,
133            calling_user: user_store
134                .update(&mut cx, |user_store, cx| {
135                    user_store.get_user(envelope.payload.calling_user_id, cx)
136                })?
137                .await?,
138            initial_project: envelope.payload.initial_project,
139        };
140        this.update(&mut cx, |this, _| {
141            *this.incoming_call.0.borrow_mut() = Some(call);
142        })?;
143
144        Ok(proto::Ack {})
145    }
146
147    async fn handle_call_canceled(
148        this: Entity<Self>,
149        envelope: TypedEnvelope<proto::CallCanceled>,
150        mut cx: AsyncApp,
151    ) -> Result<()> {
152        this.update(&mut cx, |this, _| {
153            let mut incoming_call = this.incoming_call.0.borrow_mut();
154            if incoming_call
155                .as_ref()
156                .map_or(false, |call| call.room_id == envelope.payload.room_id)
157            {
158                incoming_call.take();
159            }
160        })?;
161        Ok(())
162    }
163
164    pub fn global(cx: &App) -> Entity<Self> {
165        cx.global::<GlobalActiveCall>().0.clone()
166    }
167
168    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
169        cx.try_global::<GlobalActiveCall>()
170            .map(|call| call.0.clone())
171    }
172
173    pub fn invite(
174        &mut self,
175        called_user_id: u64,
176        initial_project: Option<Entity<Project>>,
177        cx: &mut Context<Self>,
178    ) -> Task<Result<()>> {
179        if !self.pending_invites.insert(called_user_id) {
180            return Task::ready(Err(anyhow!("user was already invited")));
181        }
182        cx.notify();
183
184        if self._join_debouncer.running() {
185            return Task::ready(Ok(()));
186        }
187
188        let room = if let Some(room) = self.room().cloned() {
189            Some(Task::ready(Ok(room)).shared())
190        } else {
191            self.pending_room_creation.clone()
192        };
193
194        let invite = if let Some(room) = room {
195            cx.spawn(move |_, mut cx| async move {
196                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
197
198                let initial_project_id = if let Some(initial_project) = initial_project {
199                    Some(
200                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
201                            .await?,
202                    )
203                } else {
204                    None
205                };
206
207                room.update(&mut cx, move |room, cx| {
208                    room.call(called_user_id, initial_project_id, cx)
209                })?
210                .await?;
211
212                anyhow::Ok(())
213            })
214        } else {
215            let client = self.client.clone();
216            let user_store = self.user_store.clone();
217            let room = cx
218                .spawn(move |this, mut cx| async move {
219                    let create_room = async {
220                        let room = cx
221                            .update(|cx| {
222                                Room::create(
223                                    called_user_id,
224                                    initial_project,
225                                    client,
226                                    user_store,
227                                    cx,
228                                )
229                            })?
230                            .await?;
231
232                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
233                            .await?;
234
235                        anyhow::Ok(room)
236                    };
237
238                    let room = create_room.await;
239                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
240                    room.map_err(Arc::new)
241                })
242                .shared();
243            self.pending_room_creation = Some(room.clone());
244            cx.background_spawn(async move {
245                room.await.map_err(|err| anyhow!("{:?}", err))?;
246                anyhow::Ok(())
247            })
248        };
249
250        cx.spawn(move |this, mut cx| async move {
251            let result = invite.await;
252            if result.is_ok() {
253                this.update(&mut cx, |this, cx| {
254                    this.report_call_event("Participant Invited", cx)
255                })?;
256            } else {
257                //TODO: report collaboration error
258                log::error!("invite failed: {:?}", result);
259            }
260
261            this.update(&mut cx, |this, cx| {
262                this.pending_invites.remove(&called_user_id);
263                cx.notify();
264            })?;
265            result
266        })
267    }
268
269    pub fn cancel_invite(
270        &mut self,
271        called_user_id: u64,
272        cx: &mut Context<Self>,
273    ) -> Task<Result<()>> {
274        let room_id = if let Some(room) = self.room() {
275            room.read(cx).id()
276        } else {
277            return Task::ready(Err(anyhow!("no active call")));
278        };
279
280        let client = self.client.clone();
281        cx.background_spawn(async move {
282            client
283                .request(proto::CancelCall {
284                    room_id,
285                    called_user_id,
286                })
287                .await?;
288            anyhow::Ok(())
289        })
290    }
291
292    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
293        self.incoming_call.1.clone()
294    }
295
296    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
297        if self.room.is_some() {
298            return Task::ready(Err(anyhow!("cannot join while on another call")));
299        }
300
301        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
302            call
303        } else {
304            return Task::ready(Err(anyhow!("no incoming call")));
305        };
306
307        if self.pending_room_creation.is_some() {
308            return Task::ready(Ok(()));
309        }
310
311        let room_id = call.room_id;
312        let client = self.client.clone();
313        let user_store = self.user_store.clone();
314        let join = self
315            ._join_debouncer
316            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
317
318        cx.spawn(|this, mut cx| async move {
319            let room = join.await?;
320            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
321                .await?;
322            this.update(&mut cx, |this, cx| {
323                this.report_call_event("Incoming Call Accepted", cx)
324            })?;
325            Ok(())
326        })
327    }
328
329    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
330        let call = self
331            .incoming_call
332            .0
333            .borrow_mut()
334            .take()
335            .ok_or_else(|| anyhow!("no incoming call"))?;
336        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
337        self.client.send(proto::DeclineCall {
338            room_id: call.room_id,
339        })?;
340        Ok(())
341    }
342
343    pub fn join_channel(
344        &mut self,
345        channel_id: ChannelId,
346        cx: &mut Context<Self>,
347    ) -> Task<Result<Option<Entity<Room>>>> {
348        if let Some(room) = self.room().cloned() {
349            if room.read(cx).channel_id() == Some(channel_id) {
350                return Task::ready(Ok(Some(room)));
351            } else {
352                room.update(cx, |room, cx| room.clear_state(cx));
353            }
354        }
355
356        if self.pending_room_creation.is_some() {
357            return Task::ready(Ok(None));
358        }
359
360        let client = self.client.clone();
361        let user_store = self.user_store.clone();
362        let join = self._join_debouncer.spawn(cx, move |cx| async move {
363            Room::join_channel(channel_id, client, user_store, cx).await
364        });
365
366        cx.spawn(|this, mut cx| async move {
367            let room = join.await?;
368            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
369                .await?;
370            this.update(&mut cx, |this, cx| {
371                this.report_call_event("Channel Joined", cx)
372            })?;
373            Ok(room)
374        })
375    }
376
377    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
378        cx.notify();
379        self.report_call_event("Call Ended", cx);
380
381        Audio::end_call(cx);
382
383        let channel_id = self.channel_id(cx);
384        if let Some((room, _)) = self.room.take() {
385            cx.emit(Event::RoomLeft { channel_id });
386            room.update(cx, |room, cx| room.leave(cx))
387        } else {
388            Task::ready(Ok(()))
389        }
390    }
391
392    pub fn share_project(
393        &mut self,
394        project: Entity<Project>,
395        cx: &mut Context<Self>,
396    ) -> Task<Result<u64>> {
397        if let Some((room, _)) = self.room.as_ref() {
398            self.report_call_event("Project Shared", cx);
399            room.update(cx, |room, cx| room.share_project(project, cx))
400        } else {
401            Task::ready(Err(anyhow!("no active call")))
402        }
403    }
404
405    pub fn unshare_project(
406        &mut self,
407        project: Entity<Project>,
408        cx: &mut Context<Self>,
409    ) -> Result<()> {
410        if let Some((room, _)) = self.room.as_ref() {
411            self.report_call_event("Project Unshared", cx);
412            room.update(cx, |room, cx| room.unshare_project(project, cx))
413        } else {
414            Err(anyhow!("no active call"))
415        }
416    }
417
418    pub fn location(&self) -> Option<&WeakEntity<Project>> {
419        self.location.as_ref()
420    }
421
422    pub fn set_location(
423        &mut self,
424        project: Option<&Entity<Project>>,
425        cx: &mut Context<Self>,
426    ) -> Task<Result<()>> {
427        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
428            self.location = project.map(|project| project.downgrade());
429            if let Some((room, _)) = self.room.as_ref() {
430                return room.update(cx, |room, cx| room.set_location(project, cx));
431            }
432        }
433        Task::ready(Ok(()))
434    }
435
436    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
437        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
438            Task::ready(Ok(()))
439        } else {
440            cx.notify();
441            if let Some(room) = room {
442                if room.read(cx).status().is_offline() {
443                    self.room = None;
444                    Task::ready(Ok(()))
445                } else {
446                    let subscriptions = vec![
447                        cx.observe(&room, |this, room, cx| {
448                            if room.read(cx).status().is_offline() {
449                                this.set_room(None, cx).detach_and_log_err(cx);
450                            }
451
452                            cx.notify();
453                        }),
454                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
455                    ];
456                    self.room = Some((room.clone(), subscriptions));
457                    let location = self
458                        .location
459                        .as_ref()
460                        .and_then(|location| location.upgrade());
461                    let channel_id = room.read(cx).channel_id();
462                    cx.emit(Event::RoomJoined { channel_id });
463                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
464                }
465            } else {
466                self.room = None;
467                Task::ready(Ok(()))
468            }
469        }
470    }
471
472    pub fn room(&self) -> Option<&Entity<Room>> {
473        self.room.as_ref().map(|(room, _)| room)
474    }
475
476    pub fn client(&self) -> Arc<Client> {
477        self.client.clone()
478    }
479
480    pub fn pending_invites(&self) -> &HashSet<u64> {
481        &self.pending_invites
482    }
483
484    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
485        if let Some(room) = self.room() {
486            let room = room.read(cx);
487            telemetry::event!(
488                operation,
489                room_id = room.id(),
490                channel_id = room.channel_id()
491            )
492        }
493    }
494}
495
496#[cfg(test)]
497mod test {
498    use gpui::TestAppContext;
499
500    use crate::OneAtATime;
501
502    #[gpui::test]
503    async fn test_one_at_a_time(cx: &mut TestAppContext) {
504        let mut one_at_a_time = OneAtATime { cancel: None };
505
506        assert_eq!(
507            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
508                .await
509                .unwrap(),
510            Some(1)
511        );
512
513        let (a, b) = cx.update(|cx| {
514            (
515                one_at_a_time.spawn(cx, |_| async {
516                    panic!("");
517                }),
518                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
519            )
520        });
521
522        assert_eq!(a.await.unwrap(), None::<u32>);
523        assert_eq!(b.await.unwrap(), Some(3));
524
525        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
526        drop(one_at_a_time);
527
528        assert_eq!(promise.await.unwrap(), None);
529    }
530}