mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use crate::call_settings::CallSettings;
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
  8use collections::HashSet;
  9use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 10use gpui::{
 11    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, Subscription, Task,
 12    WeakEntity,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19
 20pub use livekit_client::{
 21    track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
 22};
 23pub use participant::ParticipantLocation;
 24pub use room::Room;
 25
 26struct GlobalActiveCall(Entity<ActiveCall>);
 27
 28impl Global for GlobalActiveCall {}
 29
 30pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 31    livekit_client::init(
 32        cx.background_executor().dispatcher.clone(),
 33        cx.http_client(),
 34    );
 35    CallSettings::register(cx);
 36
 37    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 38    cx.set_global(GlobalActiveCall(active_call));
 39}
 40
 41pub struct OneAtATime {
 42    cancel: Option<oneshot::Sender<()>>,
 43}
 44
 45impl OneAtATime {
 46    /// spawn a task in the given context.
 47    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 48    /// otherwise you'll see the result of the task.
 49    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
 50    where
 51        F: 'static + FnOnce(AsyncApp) -> Fut,
 52        Fut: Future<Output = Result<R>>,
 53        R: 'static,
 54    {
 55        let (tx, rx) = oneshot::channel();
 56        self.cancel.replace(tx);
 57        cx.spawn(async move |cx| {
 58            futures::select_biased! {
 59                _ = rx.fuse() => Ok(None),
 60                result = f(cx.clone()).fuse() => result.map(Some),
 61            }
 62        })
 63    }
 64
 65    fn running(&self) -> bool {
 66        self.cancel
 67            .as_ref()
 68            .is_some_and(|cancel| !cancel.is_canceled())
 69    }
 70}
 71
 72#[derive(Clone)]
 73pub struct IncomingCall {
 74    pub room_id: u64,
 75    pub calling_user: Arc<User>,
 76    pub participants: Vec<Arc<User>>,
 77    pub initial_project: Option<proto::ParticipantProject>,
 78}
 79
 80/// Singleton global maintaining the user's participation in a room across workspaces.
 81pub struct ActiveCall {
 82    room: Option<(Entity<Room>, Vec<Subscription>)>,
 83    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
 84    location: Option<WeakEntity<Project>>,
 85    _join_debouncer: OneAtATime,
 86    pending_invites: HashSet<u64>,
 87    incoming_call: (
 88        watch::Sender<Option<IncomingCall>>,
 89        watch::Receiver<Option<IncomingCall>>,
 90    ),
 91    client: Arc<Client>,
 92    user_store: Entity<UserStore>,
 93    _subscriptions: Vec<client::Subscription>,
 94}
 95
 96impl EventEmitter<Event> for ActiveCall {}
 97
 98impl ActiveCall {
 99    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
100        Self {
101            room: None,
102            pending_room_creation: None,
103            location: None,
104            pending_invites: Default::default(),
105            incoming_call: watch::channel(),
106            _join_debouncer: OneAtATime { cancel: None },
107            _subscriptions: vec![
108                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
109                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
110            ],
111            client,
112            user_store,
113        }
114    }
115
116    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
117        self.room()?.read(cx).channel_id()
118    }
119
120    async fn handle_incoming_call(
121        this: Entity<Self>,
122        envelope: TypedEnvelope<proto::IncomingCall>,
123        mut cx: AsyncApp,
124    ) -> Result<proto::Ack> {
125        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
126        let call = IncomingCall {
127            room_id: envelope.payload.room_id,
128            participants: user_store
129                .update(&mut cx, |user_store, cx| {
130                    user_store.get_users(envelope.payload.participant_user_ids, cx)
131                })?
132                .await?,
133            calling_user: user_store
134                .update(&mut cx, |user_store, cx| {
135                    user_store.get_user(envelope.payload.calling_user_id, cx)
136                })?
137                .await?,
138            initial_project: envelope.payload.initial_project,
139        };
140        this.update(&mut cx, |this, _| {
141            *this.incoming_call.0.borrow_mut() = Some(call);
142        })?;
143
144        Ok(proto::Ack {})
145    }
146
147    async fn handle_call_canceled(
148        this: Entity<Self>,
149        envelope: TypedEnvelope<proto::CallCanceled>,
150        mut cx: AsyncApp,
151    ) -> Result<()> {
152        this.update(&mut cx, |this, _| {
153            let mut incoming_call = this.incoming_call.0.borrow_mut();
154            if incoming_call
155                .as_ref()
156                .map_or(false, |call| call.room_id == envelope.payload.room_id)
157            {
158                incoming_call.take();
159            }
160        })?;
161        Ok(())
162    }
163
164    pub fn global(cx: &App) -> Entity<Self> {
165        cx.global::<GlobalActiveCall>().0.clone()
166    }
167
168    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
169        cx.try_global::<GlobalActiveCall>()
170            .map(|call| call.0.clone())
171    }
172
173    pub fn invite(
174        &mut self,
175        called_user_id: u64,
176        initial_project: Option<Entity<Project>>,
177        cx: &mut Context<Self>,
178    ) -> Task<Result<()>> {
179        if !self.pending_invites.insert(called_user_id) {
180            return Task::ready(Err(anyhow!("user was already invited")));
181        }
182        cx.notify();
183
184        if self._join_debouncer.running() {
185            return Task::ready(Ok(()));
186        }
187
188        let room = if let Some(room) = self.room().cloned() {
189            Some(Task::ready(Ok(room)).shared())
190        } else {
191            self.pending_room_creation.clone()
192        };
193
194        let invite = if let Some(room) = room {
195            cx.spawn(async move |_, cx| {
196                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
197
198                let initial_project_id = if let Some(initial_project) = initial_project {
199                    Some(
200                        room.update(cx, |room, cx| room.share_project(initial_project, cx))?
201                            .await?,
202                    )
203                } else {
204                    None
205                };
206
207                room.update(cx, move |room, cx| {
208                    room.call(called_user_id, initial_project_id, cx)
209                })?
210                .await?;
211
212                anyhow::Ok(())
213            })
214        } else {
215            let client = self.client.clone();
216            let user_store = self.user_store.clone();
217            let room = cx
218                .spawn(async move |this, cx| {
219                    let create_room = async {
220                        let room = cx
221                            .update(|cx| {
222                                Room::create(
223                                    called_user_id,
224                                    initial_project,
225                                    client,
226                                    user_store,
227                                    cx,
228                                )
229                            })?
230                            .await?;
231
232                        this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
233                            .await?;
234
235                        anyhow::Ok(room)
236                    };
237
238                    let room = create_room.await;
239                    this.update(cx, |this, _| this.pending_room_creation = None)?;
240                    room.map_err(Arc::new)
241                })
242                .shared();
243            self.pending_room_creation = Some(room.clone());
244            cx.background_spawn(async move {
245                room.await.map_err(|err| anyhow!("{:?}", err))?;
246                anyhow::Ok(())
247            })
248        };
249
250        cx.spawn(async move |this, cx| {
251            let result = invite.await;
252            if result.is_ok() {
253                this.update(cx, |this, cx| {
254                    this.report_call_event("Participant Invited", cx)
255                })?;
256            } else {
257                //TODO: report collaboration error
258                log::error!("invite failed: {:?}", result);
259            }
260
261            this.update(cx, |this, cx| {
262                this.pending_invites.remove(&called_user_id);
263                cx.notify();
264            })?;
265            result
266        })
267    }
268
269    pub fn cancel_invite(
270        &mut self,
271        called_user_id: u64,
272        cx: &mut Context<Self>,
273    ) -> Task<Result<()>> {
274        let room_id = if let Some(room) = self.room() {
275            room.read(cx).id()
276        } else {
277            return Task::ready(Err(anyhow!("no active call")));
278        };
279
280        let client = self.client.clone();
281        cx.background_spawn(async move {
282            client
283                .request(proto::CancelCall {
284                    room_id,
285                    called_user_id,
286                })
287                .await?;
288            anyhow::Ok(())
289        })
290    }
291
292    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
293        self.incoming_call.1.clone()
294    }
295
296    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
297        if self.room.is_some() {
298            return Task::ready(Err(anyhow!("cannot join while on another call")));
299        }
300
301        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
302            call
303        } else {
304            return Task::ready(Err(anyhow!("no incoming call")));
305        };
306
307        if self.pending_room_creation.is_some() {
308            return Task::ready(Ok(()));
309        }
310
311        let room_id = call.room_id;
312        let client = self.client.clone();
313        let user_store = self.user_store.clone();
314        let join = self
315            ._join_debouncer
316            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
317
318        cx.spawn(async move |this, cx| {
319            let room = join.await?;
320            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
321                .await?;
322            this.update(cx, |this, cx| {
323                this.report_call_event("Incoming Call Accepted", cx)
324            })?;
325            Ok(())
326        })
327    }
328
329    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
330        let call = self
331            .incoming_call
332            .0
333            .borrow_mut()
334            .take()
335            .ok_or_else(|| anyhow!("no incoming call"))?;
336        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
337        self.client.send(proto::DeclineCall {
338            room_id: call.room_id,
339        })?;
340        Ok(())
341    }
342
343    pub fn join_channel(
344        &mut self,
345        channel_id: ChannelId,
346        cx: &mut Context<Self>,
347    ) -> Task<Result<Option<Entity<Room>>>> {
348        if let Some(room) = self.room().cloned() {
349            if room.read(cx).channel_id() == Some(channel_id) {
350                return Task::ready(Ok(Some(room)));
351            } else {
352                room.update(cx, |room, cx| room.clear_state(cx));
353            }
354        }
355
356        if self.pending_room_creation.is_some() {
357            return Task::ready(Ok(None));
358        }
359
360        let client = self.client.clone();
361        let user_store = self.user_store.clone();
362        let join = self._join_debouncer.spawn(cx, move |cx| async move {
363            Room::join_channel(channel_id, client, user_store, cx).await
364        });
365
366        cx.spawn(async move |this, cx| {
367            let room = join.await?;
368            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
369                .await?;
370            this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
371            Ok(room)
372        })
373    }
374
375    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
376        cx.notify();
377        self.report_call_event("Call Ended", cx);
378
379        Audio::end_call(cx);
380
381        let channel_id = self.channel_id(cx);
382        if let Some((room, _)) = self.room.take() {
383            cx.emit(Event::RoomLeft { channel_id });
384            room.update(cx, |room, cx| room.leave(cx))
385        } else {
386            Task::ready(Ok(()))
387        }
388    }
389
390    pub fn share_project(
391        &mut self,
392        project: Entity<Project>,
393        cx: &mut Context<Self>,
394    ) -> Task<Result<u64>> {
395        if let Some((room, _)) = self.room.as_ref() {
396            self.report_call_event("Project Shared", cx);
397            room.update(cx, |room, cx| room.share_project(project, cx))
398        } else {
399            Task::ready(Err(anyhow!("no active call")))
400        }
401    }
402
403    pub fn unshare_project(
404        &mut self,
405        project: Entity<Project>,
406        cx: &mut Context<Self>,
407    ) -> Result<()> {
408        if let Some((room, _)) = self.room.as_ref() {
409            self.report_call_event("Project Unshared", cx);
410            room.update(cx, |room, cx| room.unshare_project(project, cx))
411        } else {
412            Err(anyhow!("no active call"))
413        }
414    }
415
416    pub fn location(&self) -> Option<&WeakEntity<Project>> {
417        self.location.as_ref()
418    }
419
420    pub fn set_location(
421        &mut self,
422        project: Option<&Entity<Project>>,
423        cx: &mut Context<Self>,
424    ) -> Task<Result<()>> {
425        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
426            self.location = project.map(|project| project.downgrade());
427            if let Some((room, _)) = self.room.as_ref() {
428                return room.update(cx, |room, cx| room.set_location(project, cx));
429            }
430        }
431        Task::ready(Ok(()))
432    }
433
434    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
435        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
436            Task::ready(Ok(()))
437        } else {
438            cx.notify();
439            if let Some(room) = room {
440                if room.read(cx).status().is_offline() {
441                    self.room = None;
442                    Task::ready(Ok(()))
443                } else {
444                    let subscriptions = vec![
445                        cx.observe(&room, |this, room, cx| {
446                            if room.read(cx).status().is_offline() {
447                                this.set_room(None, cx).detach_and_log_err(cx);
448                            }
449
450                            cx.notify();
451                        }),
452                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
453                    ];
454                    self.room = Some((room.clone(), subscriptions));
455                    let location = self
456                        .location
457                        .as_ref()
458                        .and_then(|location| location.upgrade());
459                    let channel_id = room.read(cx).channel_id();
460                    cx.emit(Event::RoomJoined { channel_id });
461                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
462                }
463            } else {
464                self.room = None;
465                Task::ready(Ok(()))
466            }
467        }
468    }
469
470    pub fn room(&self) -> Option<&Entity<Room>> {
471        self.room.as_ref().map(|(room, _)| room)
472    }
473
474    pub fn client(&self) -> Arc<Client> {
475        self.client.clone()
476    }
477
478    pub fn pending_invites(&self) -> &HashSet<u64> {
479        &self.pending_invites
480    }
481
482    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
483        if let Some(room) = self.room() {
484            let room = room.read(cx);
485            telemetry::event!(
486                operation,
487                room_id = room.id(),
488                channel_id = room.channel_id()
489            )
490        }
491    }
492}
493
494#[cfg(test)]
495mod test {
496    use gpui::TestAppContext;
497
498    use crate::OneAtATime;
499
500    #[gpui::test]
501    async fn test_one_at_a_time(cx: &mut TestAppContext) {
502        let mut one_at_a_time = OneAtATime { cancel: None };
503
504        assert_eq!(
505            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
506                .await
507                .unwrap(),
508            Some(1)
509        );
510
511        let (a, b) = cx.update(|cx| {
512            (
513                one_at_a_time.spawn(cx, |_| async {
514                    panic!("");
515                }),
516                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
517            )
518        });
519
520        assert_eq!(a.await.unwrap(), None::<u32>);
521        assert_eq!(b.await.unwrap(), Some(3));
522
523        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
524        drop(one_at_a_time);
525
526        assert_eq!(promise.await.unwrap(), None);
527    }
528}