mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use crate::call_settings::CallSettings;
  5use anyhow::{Context as _, Result, anyhow};
  6use audio::Audio;
  7use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
  8use collections::HashSet;
  9use futures::{Future, FutureExt, channel::oneshot, future::Shared};
 10use gpui::{
 11    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, Subscription, Task,
 12    WeakEntity,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19
 20pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 21pub use participant::ParticipantLocation;
 22pub use room::Room;
 23
 24struct GlobalActiveCall(Entity<ActiveCall>);
 25
 26impl Global for GlobalActiveCall {}
 27
 28pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 29    CallSettings::register(cx);
 30
 31    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 32    cx.set_global(GlobalActiveCall(active_call));
 33}
 34
 35pub struct OneAtATime {
 36    cancel: Option<oneshot::Sender<()>>,
 37}
 38
 39impl OneAtATime {
 40    /// spawn a task in the given context.
 41    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 42    /// otherwise you'll see the result of the task.
 43    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
 44    where
 45        F: 'static + FnOnce(AsyncApp) -> Fut,
 46        Fut: Future<Output = Result<R>>,
 47        R: 'static,
 48    {
 49        let (tx, rx) = oneshot::channel();
 50        self.cancel.replace(tx);
 51        cx.spawn(async move |cx| {
 52            futures::select_biased! {
 53                _ = rx.fuse() => Ok(None),
 54                result = f(cx.clone()).fuse() => result.map(Some),
 55            }
 56        })
 57    }
 58
 59    fn running(&self) -> bool {
 60        self.cancel
 61            .as_ref()
 62            .is_some_and(|cancel| !cancel.is_canceled())
 63    }
 64}
 65
 66#[derive(Clone)]
 67pub struct IncomingCall {
 68    pub room_id: u64,
 69    pub calling_user: Arc<User>,
 70    pub participants: Vec<Arc<User>>,
 71    pub initial_project: Option<proto::ParticipantProject>,
 72}
 73
 74/// Singleton global maintaining the user's participation in a room across workspaces.
 75pub struct ActiveCall {
 76    room: Option<(Entity<Room>, Vec<Subscription>)>,
 77    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
 78    location: Option<WeakEntity<Project>>,
 79    _join_debouncer: OneAtATime,
 80    pending_invites: HashSet<u64>,
 81    incoming_call: (
 82        watch::Sender<Option<IncomingCall>>,
 83        watch::Receiver<Option<IncomingCall>>,
 84    ),
 85    client: Arc<Client>,
 86    user_store: Entity<UserStore>,
 87    _subscriptions: Vec<client::Subscription>,
 88}
 89
 90impl EventEmitter<Event> for ActiveCall {}
 91
 92impl ActiveCall {
 93    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 94        Self {
 95            room: None,
 96            pending_room_creation: None,
 97            location: None,
 98            pending_invites: Default::default(),
 99            incoming_call: watch::channel(),
100            _join_debouncer: OneAtATime { cancel: None },
101            _subscriptions: vec![
102                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
103                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
104            ],
105            client,
106            user_store,
107        }
108    }
109
110    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
111        self.room()?.read(cx).channel_id()
112    }
113
114    async fn handle_incoming_call(
115        this: Entity<Self>,
116        envelope: TypedEnvelope<proto::IncomingCall>,
117        mut cx: AsyncApp,
118    ) -> Result<proto::Ack> {
119        let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
120        let call = IncomingCall {
121            room_id: envelope.payload.room_id,
122            participants: user_store
123                .update(&mut cx, |user_store, cx| {
124                    user_store.get_users(envelope.payload.participant_user_ids, cx)
125                })?
126                .await?,
127            calling_user: user_store
128                .update(&mut cx, |user_store, cx| {
129                    user_store.get_user(envelope.payload.calling_user_id, cx)
130                })?
131                .await?,
132            initial_project: envelope.payload.initial_project,
133        };
134        this.update(&mut cx, |this, _| {
135            *this.incoming_call.0.borrow_mut() = Some(call);
136        })?;
137
138        Ok(proto::Ack {})
139    }
140
141    async fn handle_call_canceled(
142        this: Entity<Self>,
143        envelope: TypedEnvelope<proto::CallCanceled>,
144        mut cx: AsyncApp,
145    ) -> Result<()> {
146        this.update(&mut cx, |this, _| {
147            let mut incoming_call = this.incoming_call.0.borrow_mut();
148            if incoming_call
149                .as_ref()
150                .is_some_and(|call| call.room_id == envelope.payload.room_id)
151            {
152                incoming_call.take();
153            }
154        })?;
155        Ok(())
156    }
157
158    pub fn global(cx: &App) -> Entity<Self> {
159        cx.global::<GlobalActiveCall>().0.clone()
160    }
161
162    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
163        cx.try_global::<GlobalActiveCall>()
164            .map(|call| call.0.clone())
165    }
166
167    pub fn invite(
168        &mut self,
169        called_user_id: u64,
170        initial_project: Option<Entity<Project>>,
171        cx: &mut Context<Self>,
172    ) -> Task<Result<()>> {
173        if !self.pending_invites.insert(called_user_id) {
174            return Task::ready(Err(anyhow!("user was already invited")));
175        }
176        cx.notify();
177
178        if self._join_debouncer.running() {
179            return Task::ready(Ok(()));
180        }
181
182        let room = if let Some(room) = self.room().cloned() {
183            Some(Task::ready(Ok(room)).shared())
184        } else {
185            self.pending_room_creation.clone()
186        };
187
188        let invite = if let Some(room) = room {
189            cx.spawn(async move |_, cx| {
190                let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
191
192                let initial_project_id = if let Some(initial_project) = initial_project {
193                    Some(
194                        room.update(cx, |room, cx| room.share_project(initial_project, cx))?
195                            .await?,
196                    )
197                } else {
198                    None
199                };
200
201                room.update(cx, move |room, cx| {
202                    room.call(called_user_id, initial_project_id, cx)
203                })?
204                .await?;
205
206                anyhow::Ok(())
207            })
208        } else {
209            let client = self.client.clone();
210            let user_store = self.user_store.clone();
211            let room = cx
212                .spawn(async move |this, cx| {
213                    let create_room = async {
214                        let room = cx
215                            .update(|cx| {
216                                Room::create(
217                                    called_user_id,
218                                    initial_project,
219                                    client,
220                                    user_store,
221                                    cx,
222                                )
223                            })?
224                            .await?;
225
226                        this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
227                            .await?;
228
229                        anyhow::Ok(room)
230                    };
231
232                    let room = create_room.await;
233                    this.update(cx, |this, _| this.pending_room_creation = None)?;
234                    room.map_err(Arc::new)
235                })
236                .shared();
237            self.pending_room_creation = Some(room.clone());
238            cx.background_spawn(async move {
239                room.await.map_err(|err| anyhow!("{err:?}"))?;
240                anyhow::Ok(())
241            })
242        };
243
244        cx.spawn(async move |this, cx| {
245            let result = invite.await;
246            if result.is_ok() {
247                this.update(cx, |this, cx| {
248                    this.report_call_event("Participant Invited", cx)
249                })?;
250            } else {
251                //TODO: report collaboration error
252                log::error!("invite failed: {:?}", result);
253            }
254
255            this.update(cx, |this, cx| {
256                this.pending_invites.remove(&called_user_id);
257                cx.notify();
258            })?;
259            result
260        })
261    }
262
263    pub fn cancel_invite(
264        &mut self,
265        called_user_id: u64,
266        cx: &mut Context<Self>,
267    ) -> Task<Result<()>> {
268        let room_id = if let Some(room) = self.room() {
269            room.read(cx).id()
270        } else {
271            return Task::ready(Err(anyhow!("no active call")));
272        };
273
274        let client = self.client.clone();
275        cx.background_spawn(async move {
276            client
277                .request(proto::CancelCall {
278                    room_id,
279                    called_user_id,
280                })
281                .await?;
282            anyhow::Ok(())
283        })
284    }
285
286    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
287        self.incoming_call.1.clone()
288    }
289
290    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
291        if self.room.is_some() {
292            return Task::ready(Err(anyhow!("cannot join while on another call")));
293        }
294
295        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
296            call
297        } else {
298            return Task::ready(Err(anyhow!("no incoming call")));
299        };
300
301        if self.pending_room_creation.is_some() {
302            return Task::ready(Ok(()));
303        }
304
305        let room_id = call.room_id;
306        let client = self.client.clone();
307        let user_store = self.user_store.clone();
308        let join = self
309            ._join_debouncer
310            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
311
312        cx.spawn(async move |this, cx| {
313            let room = join.await?;
314            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
315                .await?;
316            this.update(cx, |this, cx| {
317                this.report_call_event("Incoming Call Accepted", cx)
318            })?;
319            Ok(())
320        })
321    }
322
323    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
324        let call = self
325            .incoming_call
326            .0
327            .borrow_mut()
328            .take()
329            .context("no incoming call")?;
330        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
331        self.client.send(proto::DeclineCall {
332            room_id: call.room_id,
333        })?;
334        Ok(())
335    }
336
337    pub fn join_channel(
338        &mut self,
339        channel_id: ChannelId,
340        cx: &mut Context<Self>,
341    ) -> Task<Result<Option<Entity<Room>>>> {
342        if let Some(room) = self.room().cloned() {
343            if room.read(cx).channel_id() == Some(channel_id) {
344                return Task::ready(Ok(Some(room)));
345            } else {
346                room.update(cx, |room, cx| room.clear_state(cx));
347            }
348        }
349
350        if self.pending_room_creation.is_some() {
351            return Task::ready(Ok(None));
352        }
353
354        let client = self.client.clone();
355        let user_store = self.user_store.clone();
356        let join = self._join_debouncer.spawn(cx, move |cx| async move {
357            Room::join_channel(channel_id, client, user_store, cx).await
358        });
359
360        cx.spawn(async move |this, cx| {
361            let room = join.await?;
362            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
363                .await?;
364            this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
365            Ok(room)
366        })
367    }
368
369    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
370        cx.notify();
371        self.report_call_event("Call Ended", cx);
372
373        Audio::end_call(cx);
374
375        let channel_id = self.channel_id(cx);
376        if let Some((room, _)) = self.room.take() {
377            cx.emit(Event::RoomLeft { channel_id });
378            room.update(cx, |room, cx| room.leave(cx))
379        } else {
380            Task::ready(Ok(()))
381        }
382    }
383
384    pub fn share_project(
385        &mut self,
386        project: Entity<Project>,
387        cx: &mut Context<Self>,
388    ) -> Task<Result<u64>> {
389        if let Some((room, _)) = self.room.as_ref() {
390            self.report_call_event("Project Shared", cx);
391            room.update(cx, |room, cx| room.share_project(project, cx))
392        } else {
393            Task::ready(Err(anyhow!("no active call")))
394        }
395    }
396
397    pub fn unshare_project(
398        &mut self,
399        project: Entity<Project>,
400        cx: &mut Context<Self>,
401    ) -> Result<()> {
402        let (room, _) = self.room.as_ref().context("no active call")?;
403        self.report_call_event("Project Unshared", cx);
404        room.update(cx, |room, cx| room.unshare_project(project, cx))
405    }
406
407    pub const fn location(&self) -> Option<&WeakEntity<Project>> {
408        self.location.as_ref()
409    }
410
411    pub fn set_location(
412        &mut self,
413        project: Option<&Entity<Project>>,
414        cx: &mut Context<Self>,
415    ) -> Task<Result<()>> {
416        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
417            self.location = project.map(|project| project.downgrade());
418            if let Some((room, _)) = self.room.as_ref() {
419                return room.update(cx, |room, cx| room.set_location(project, cx));
420            }
421        }
422        Task::ready(Ok(()))
423    }
424
425    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
426        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
427            Task::ready(Ok(()))
428        } else {
429            cx.notify();
430            if let Some(room) = room {
431                if room.read(cx).status().is_offline() {
432                    self.room = None;
433                    Task::ready(Ok(()))
434                } else {
435                    let subscriptions = vec![
436                        cx.observe(&room, |this, room, cx| {
437                            if room.read(cx).status().is_offline() {
438                                this.set_room(None, cx).detach_and_log_err(cx);
439                            }
440
441                            cx.notify();
442                        }),
443                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
444                    ];
445                    self.room = Some((room.clone(), subscriptions));
446                    let location = self
447                        .location
448                        .as_ref()
449                        .and_then(|location| location.upgrade());
450                    let channel_id = room.read(cx).channel_id();
451                    cx.emit(Event::RoomJoined { channel_id });
452                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
453                }
454            } else {
455                self.room = None;
456                Task::ready(Ok(()))
457            }
458        }
459    }
460
461    pub fn room(&self) -> Option<&Entity<Room>> {
462        self.room.as_ref().map(|(room, _)| room)
463    }
464
465    pub fn client(&self) -> Arc<Client> {
466        self.client.clone()
467    }
468
469    pub const fn pending_invites(&self) -> &HashSet<u64> {
470        &self.pending_invites
471    }
472
473    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
474        if let Some(room) = self.room() {
475            let room = room.read(cx);
476            telemetry::event!(
477                operation,
478                room_id = room.id(),
479                channel_id = room.channel_id()
480            )
481        }
482    }
483}
484
485#[cfg(test)]
486mod test {
487    use gpui::TestAppContext;
488
489    use crate::OneAtATime;
490
491    #[gpui::test]
492    async fn test_one_at_a_time(cx: &mut TestAppContext) {
493        let mut one_at_a_time = OneAtATime { cancel: None };
494
495        assert_eq!(
496            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
497                .await
498                .unwrap(),
499            Some(1)
500        );
501
502        let (a, b) = cx.update(|cx| {
503            (
504                one_at_a_time.spawn(cx, |_| async {
505                    panic!("");
506                }),
507                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
508            )
509        });
510
511        assert_eq!(a.await.unwrap(), None::<u32>);
512        assert_eq!(b.await.unwrap(), Some(3));
513
514        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
515        drop(one_at_a_time);
516
517        assert_eq!(promise.await.unwrap(), None);
518    }
519}