mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use anyhow::{Context as _, Result, anyhow};
  5use audio::Audio;
  6use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
  7use collections::HashSet;
  8use futures::{Future, FutureExt, channel::oneshot, future::Shared};
  9use gpui::{
 10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, Subscription, Task,
 11    WeakEntity,
 12};
 13use postage::watch;
 14use project::Project;
 15use room::Event;
 16use std::sync::Arc;
 17
 18pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 19pub use participant::ParticipantLocation;
 20pub use room::Room;
 21
 22struct GlobalActiveCall(Entity<ActiveCall>);
 23
 24impl Global for GlobalActiveCall {}
 25
 26pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 27    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 28    cx.set_global(GlobalActiveCall(active_call));
 29}
 30
 31pub struct OneAtATime {
 32    cancel: Option<oneshot::Sender<()>>,
 33}
 34
 35impl OneAtATime {
 36    /// spawn a task in the given context.
 37    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 38    /// otherwise you'll see the result of the task.
 39    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
 40    where
 41        F: 'static + FnOnce(AsyncApp) -> Fut,
 42        Fut: Future<Output = Result<R>>,
 43        R: 'static,
 44    {
 45        let (tx, rx) = oneshot::channel();
 46        self.cancel.replace(tx);
 47        cx.spawn(async move |cx| {
 48            futures::select_biased! {
 49                _ = rx.fuse() => Ok(None),
 50                result = f(cx.clone()).fuse() => result.map(Some),
 51            }
 52        })
 53    }
 54
 55    fn running(&self) -> bool {
 56        self.cancel
 57            .as_ref()
 58            .is_some_and(|cancel| !cancel.is_canceled())
 59    }
 60}
 61
 62#[derive(Clone)]
 63pub struct IncomingCall {
 64    pub room_id: u64,
 65    pub calling_user: Arc<User>,
 66    pub participants: Vec<Arc<User>>,
 67    pub initial_project: Option<proto::ParticipantProject>,
 68}
 69
 70/// Singleton global maintaining the user's participation in a room across workspaces.
 71pub struct ActiveCall {
 72    room: Option<(Entity<Room>, Vec<Subscription>)>,
 73    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
 74    location: Option<WeakEntity<Project>>,
 75    _join_debouncer: OneAtATime,
 76    pending_invites: HashSet<u64>,
 77    incoming_call: (
 78        watch::Sender<Option<IncomingCall>>,
 79        watch::Receiver<Option<IncomingCall>>,
 80    ),
 81    client: Arc<Client>,
 82    user_store: Entity<UserStore>,
 83    _subscriptions: Vec<client::Subscription>,
 84}
 85
 86impl EventEmitter<Event> for ActiveCall {}
 87
 88impl ActiveCall {
 89    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 90        Self {
 91            room: None,
 92            pending_room_creation: None,
 93            location: None,
 94            pending_invites: Default::default(),
 95            incoming_call: watch::channel(),
 96            _join_debouncer: OneAtATime { cancel: None },
 97            _subscriptions: vec![
 98                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
 99                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
100            ],
101            client,
102            user_store,
103        }
104    }
105
106    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
107        self.room()?.read(cx).channel_id()
108    }
109
110    async fn handle_incoming_call(
111        this: Entity<Self>,
112        envelope: TypedEnvelope<proto::IncomingCall>,
113        mut cx: AsyncApp,
114    ) -> Result<proto::Ack> {
115        let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
116        let call = IncomingCall {
117            room_id: envelope.payload.room_id,
118            participants: user_store
119                .update(&mut cx, |user_store, cx| {
120                    user_store.get_users(envelope.payload.participant_user_ids, cx)
121                })?
122                .await?,
123            calling_user: user_store
124                .update(&mut cx, |user_store, cx| {
125                    user_store.get_user(envelope.payload.calling_user_id, cx)
126                })?
127                .await?,
128            initial_project: envelope.payload.initial_project,
129        };
130        this.update(&mut cx, |this, _| {
131            *this.incoming_call.0.borrow_mut() = Some(call);
132        })?;
133
134        Ok(proto::Ack {})
135    }
136
137    async fn handle_call_canceled(
138        this: Entity<Self>,
139        envelope: TypedEnvelope<proto::CallCanceled>,
140        mut cx: AsyncApp,
141    ) -> Result<()> {
142        this.update(&mut cx, |this, _| {
143            let mut incoming_call = this.incoming_call.0.borrow_mut();
144            if incoming_call
145                .as_ref()
146                .is_some_and(|call| call.room_id == envelope.payload.room_id)
147            {
148                incoming_call.take();
149            }
150        })?;
151        Ok(())
152    }
153
154    pub fn global(cx: &App) -> Entity<Self> {
155        cx.global::<GlobalActiveCall>().0.clone()
156    }
157
158    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
159        cx.try_global::<GlobalActiveCall>()
160            .map(|call| call.0.clone())
161    }
162
163    pub fn invite(
164        &mut self,
165        called_user_id: u64,
166        initial_project: Option<Entity<Project>>,
167        cx: &mut Context<Self>,
168    ) -> Task<Result<()>> {
169        if !self.pending_invites.insert(called_user_id) {
170            return Task::ready(Err(anyhow!("user was already invited")));
171        }
172        cx.notify();
173
174        if self._join_debouncer.running() {
175            return Task::ready(Ok(()));
176        }
177
178        let room = if let Some(room) = self.room().cloned() {
179            Some(Task::ready(Ok(room)).shared())
180        } else {
181            self.pending_room_creation.clone()
182        };
183
184        let invite = if let Some(room) = room {
185            cx.spawn(async move |_, cx| {
186                let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
187
188                let initial_project_id = if let Some(initial_project) = initial_project {
189                    Some(
190                        room.update(cx, |room, cx| room.share_project(initial_project, cx))?
191                            .await?,
192                    )
193                } else {
194                    None
195                };
196
197                room.update(cx, move |room, cx| {
198                    room.call(called_user_id, initial_project_id, cx)
199                })?
200                .await?;
201
202                anyhow::Ok(())
203            })
204        } else {
205            let client = self.client.clone();
206            let user_store = self.user_store.clone();
207            let room = cx
208                .spawn(async move |this, cx| {
209                    let create_room = async {
210                        let room = cx
211                            .update(|cx| {
212                                Room::create(
213                                    called_user_id,
214                                    initial_project,
215                                    client,
216                                    user_store,
217                                    cx,
218                                )
219                            })?
220                            .await?;
221
222                        this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
223                            .await?;
224
225                        anyhow::Ok(room)
226                    };
227
228                    let room = create_room.await;
229                    this.update(cx, |this, _| this.pending_room_creation = None)?;
230                    room.map_err(Arc::new)
231                })
232                .shared();
233            self.pending_room_creation = Some(room.clone());
234            cx.background_spawn(async move {
235                room.await.map_err(|err| anyhow!("{err:?}"))?;
236                anyhow::Ok(())
237            })
238        };
239
240        cx.spawn(async move |this, cx| {
241            let result = invite.await;
242            if result.is_ok() {
243                this.update(cx, |this, cx| {
244                    this.report_call_event("Participant Invited", cx)
245                })?;
246            } else {
247                //TODO: report collaboration error
248                log::error!("invite failed: {:?}", result);
249            }
250
251            this.update(cx, |this, cx| {
252                this.pending_invites.remove(&called_user_id);
253                cx.notify();
254            })?;
255            result
256        })
257    }
258
259    pub fn cancel_invite(
260        &mut self,
261        called_user_id: u64,
262        cx: &mut Context<Self>,
263    ) -> Task<Result<()>> {
264        let room_id = if let Some(room) = self.room() {
265            room.read(cx).id()
266        } else {
267            return Task::ready(Err(anyhow!("no active call")));
268        };
269
270        let client = self.client.clone();
271        cx.background_spawn(async move {
272            client
273                .request(proto::CancelCall {
274                    room_id,
275                    called_user_id,
276                })
277                .await?;
278            anyhow::Ok(())
279        })
280    }
281
282    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
283        self.incoming_call.1.clone()
284    }
285
286    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
287        if self.room.is_some() {
288            return Task::ready(Err(anyhow!("cannot join while on another call")));
289        }
290
291        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
292            call
293        } else {
294            return Task::ready(Err(anyhow!("no incoming call")));
295        };
296
297        if self.pending_room_creation.is_some() {
298            return Task::ready(Ok(()));
299        }
300
301        let room_id = call.room_id;
302        let client = self.client.clone();
303        let user_store = self.user_store.clone();
304        let join = self
305            ._join_debouncer
306            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
307
308        cx.spawn(async move |this, cx| {
309            let room = join.await?;
310            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
311                .await?;
312            this.update(cx, |this, cx| {
313                this.report_call_event("Incoming Call Accepted", cx)
314            })?;
315            Ok(())
316        })
317    }
318
319    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
320        let call = self
321            .incoming_call
322            .0
323            .borrow_mut()
324            .take()
325            .context("no incoming call")?;
326        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
327        self.client.send(proto::DeclineCall {
328            room_id: call.room_id,
329        })?;
330        Ok(())
331    }
332
333    pub fn join_channel(
334        &mut self,
335        channel_id: ChannelId,
336        cx: &mut Context<Self>,
337    ) -> Task<Result<Option<Entity<Room>>>> {
338        if let Some(room) = self.room().cloned() {
339            if room.read(cx).channel_id() == Some(channel_id) {
340                return Task::ready(Ok(Some(room)));
341            } else {
342                room.update(cx, |room, cx| room.clear_state(cx));
343            }
344        }
345
346        if self.pending_room_creation.is_some() {
347            return Task::ready(Ok(None));
348        }
349
350        let client = self.client.clone();
351        let user_store = self.user_store.clone();
352        let join = self._join_debouncer.spawn(cx, move |cx| async move {
353            Room::join_channel(channel_id, client, user_store, cx).await
354        });
355
356        cx.spawn(async move |this, cx| {
357            let room = join.await?;
358            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
359                .await?;
360            this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
361            Ok(room)
362        })
363    }
364
365    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
366        cx.notify();
367        self.report_call_event("Call Ended", cx);
368
369        Audio::end_call(cx);
370
371        let channel_id = self.channel_id(cx);
372        if let Some((room, _)) = self.room.take() {
373            cx.emit(Event::RoomLeft { channel_id });
374            room.update(cx, |room, cx| room.leave(cx))
375        } else {
376            Task::ready(Ok(()))
377        }
378    }
379
380    pub fn share_project(
381        &mut self,
382        project: Entity<Project>,
383        cx: &mut Context<Self>,
384    ) -> Task<Result<u64>> {
385        if let Some((room, _)) = self.room.as_ref() {
386            self.report_call_event("Project Shared", cx);
387            room.update(cx, |room, cx| room.share_project(project, cx))
388        } else {
389            Task::ready(Err(anyhow!("no active call")))
390        }
391    }
392
393    pub fn unshare_project(
394        &mut self,
395        project: Entity<Project>,
396        cx: &mut Context<Self>,
397    ) -> Result<()> {
398        let (room, _) = self.room.as_ref().context("no active call")?;
399        self.report_call_event("Project Unshared", cx);
400        room.update(cx, |room, cx| room.unshare_project(project, cx))
401    }
402
403    pub fn location(&self) -> Option<&WeakEntity<Project>> {
404        self.location.as_ref()
405    }
406
407    pub fn set_location(
408        &mut self,
409        project: Option<&Entity<Project>>,
410        cx: &mut Context<Self>,
411    ) -> Task<Result<()>> {
412        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
413            self.location = project.map(|project| project.downgrade());
414            if let Some((room, _)) = self.room.as_ref() {
415                return room.update(cx, |room, cx| room.set_location(project, cx));
416            }
417        }
418        Task::ready(Ok(()))
419    }
420
421    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
422        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
423            Task::ready(Ok(()))
424        } else {
425            cx.notify();
426            if let Some(room) = room {
427                if room.read(cx).status().is_offline() {
428                    self.room = None;
429                    Task::ready(Ok(()))
430                } else {
431                    let subscriptions = vec![
432                        cx.observe(&room, |this, room, cx| {
433                            if room.read(cx).status().is_offline() {
434                                this.set_room(None, cx).detach_and_log_err(cx);
435                            }
436
437                            cx.notify();
438                        }),
439                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
440                    ];
441                    self.room = Some((room.clone(), subscriptions));
442                    let location = self
443                        .location
444                        .as_ref()
445                        .and_then(|location| location.upgrade());
446                    let channel_id = room.read(cx).channel_id();
447                    cx.emit(Event::RoomJoined { channel_id });
448                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
449                }
450            } else {
451                self.room = None;
452                Task::ready(Ok(()))
453            }
454        }
455    }
456
457    pub fn room(&self) -> Option<&Entity<Room>> {
458        self.room.as_ref().map(|(room, _)| room)
459    }
460
461    pub fn client(&self) -> Arc<Client> {
462        self.client.clone()
463    }
464
465    pub fn pending_invites(&self) -> &HashSet<u64> {
466        &self.pending_invites
467    }
468
469    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
470        if let Some(room) = self.room() {
471            let room = room.read(cx);
472            telemetry::event!(
473                operation,
474                room_id = room.id(),
475                channel_id = room.channel_id()
476            )
477        }
478    }
479}
480
481#[cfg(test)]
482mod test {
483    use gpui::TestAppContext;
484
485    use crate::OneAtATime;
486
487    #[gpui::test]
488    async fn test_one_at_a_time(cx: &mut TestAppContext) {
489        let mut one_at_a_time = OneAtATime { cancel: None };
490
491        assert_eq!(
492            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
493                .await
494                .unwrap(),
495            Some(1)
496        );
497
498        let (a, b) = cx.update(|cx| {
499            (
500                one_at_a_time.spawn(cx, |_| async {
501                    panic!("");
502                }),
503                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
504            )
505        });
506
507        assert_eq!(a.await.unwrap(), None::<u32>);
508        assert_eq!(b.await.unwrap(), Some(3));
509
510        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
511        drop(one_at_a_time);
512
513        assert_eq!(promise.await.unwrap(), None);
514    }
515}