mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use crate::call_settings::CallSettings;
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
  8use collections::HashSet;
  9use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 10use gpui::{
 11    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, Subscription,
 12    Task, WeakModel,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19
 20pub use livekit_client::{
 21    track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
 22};
 23pub use participant::ParticipantLocation;
 24pub use room::Room;
 25
 26struct GlobalActiveCall(Model<ActiveCall>);
 27
 28impl Global for GlobalActiveCall {}
 29
 30pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 31    livekit_client::init(
 32        cx.background_executor().dispatcher.clone(),
 33        cx.http_client(),
 34    );
 35    CallSettings::register(cx);
 36
 37    let active_call = cx.new_model(|cx| ActiveCall::new(client, user_store, cx));
 38    cx.set_global(GlobalActiveCall(active_call));
 39}
 40
 41pub struct OneAtATime {
 42    cancel: Option<oneshot::Sender<()>>,
 43}
 44
 45impl OneAtATime {
 46    /// spawn a task in the given context.
 47    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 48    /// otherwise you'll see the result of the task.
 49    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 50    where
 51        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 52        Fut: Future<Output = Result<R>>,
 53        R: 'static,
 54    {
 55        let (tx, rx) = oneshot::channel();
 56        self.cancel.replace(tx);
 57        cx.spawn(|cx| async move {
 58            futures::select_biased! {
 59                _ = rx.fuse() => Ok(None),
 60                result = f(cx).fuse() => result.map(Some),
 61            }
 62        })
 63    }
 64
 65    fn running(&self) -> bool {
 66        self.cancel
 67            .as_ref()
 68            .is_some_and(|cancel| !cancel.is_canceled())
 69    }
 70}
 71
 72#[derive(Clone)]
 73pub struct IncomingCall {
 74    pub room_id: u64,
 75    pub calling_user: Arc<User>,
 76    pub participants: Vec<Arc<User>>,
 77    pub initial_project: Option<proto::ParticipantProject>,
 78}
 79
 80/// Singleton global maintaining the user's participation in a room across workspaces.
 81pub struct ActiveCall {
 82    room: Option<(Model<Room>, Vec<Subscription>)>,
 83    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 84    location: Option<WeakModel<Project>>,
 85    _join_debouncer: OneAtATime,
 86    pending_invites: HashSet<u64>,
 87    incoming_call: (
 88        watch::Sender<Option<IncomingCall>>,
 89        watch::Receiver<Option<IncomingCall>>,
 90    ),
 91    client: Arc<Client>,
 92    user_store: Model<UserStore>,
 93    _subscriptions: Vec<client::Subscription>,
 94}
 95
 96impl EventEmitter<Event> for ActiveCall {}
 97
 98impl ActiveCall {
 99    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
100        Self {
101            room: None,
102            pending_room_creation: None,
103            location: None,
104            pending_invites: Default::default(),
105            incoming_call: watch::channel(),
106            _join_debouncer: OneAtATime { cancel: None },
107            _subscriptions: vec![
108                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
109                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
110            ],
111            client,
112            user_store,
113        }
114    }
115
116    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
117        self.room()?.read(cx).channel_id()
118    }
119
120    async fn handle_incoming_call(
121        this: Model<Self>,
122        envelope: TypedEnvelope<proto::IncomingCall>,
123        mut cx: AsyncAppContext,
124    ) -> Result<proto::Ack> {
125        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
126        let call = IncomingCall {
127            room_id: envelope.payload.room_id,
128            participants: user_store
129                .update(&mut cx, |user_store, cx| {
130                    user_store.get_users(envelope.payload.participant_user_ids, cx)
131                })?
132                .await?,
133            calling_user: user_store
134                .update(&mut cx, |user_store, cx| {
135                    user_store.get_user(envelope.payload.calling_user_id, cx)
136                })?
137                .await?,
138            initial_project: envelope.payload.initial_project,
139        };
140        this.update(&mut cx, |this, _| {
141            *this.incoming_call.0.borrow_mut() = Some(call);
142        })?;
143
144        Ok(proto::Ack {})
145    }
146
147    async fn handle_call_canceled(
148        this: Model<Self>,
149        envelope: TypedEnvelope<proto::CallCanceled>,
150        mut cx: AsyncAppContext,
151    ) -> Result<()> {
152        this.update(&mut cx, |this, _| {
153            let mut incoming_call = this.incoming_call.0.borrow_mut();
154            if incoming_call
155                .as_ref()
156                .map_or(false, |call| call.room_id == envelope.payload.room_id)
157            {
158                incoming_call.take();
159            }
160        })?;
161        Ok(())
162    }
163
164    pub fn global(cx: &AppContext) -> Model<Self> {
165        cx.global::<GlobalActiveCall>().0.clone()
166    }
167
168    pub fn try_global(cx: &AppContext) -> Option<Model<Self>> {
169        cx.try_global::<GlobalActiveCall>()
170            .map(|call| call.0.clone())
171    }
172
173    pub fn invite(
174        &mut self,
175        called_user_id: u64,
176        initial_project: Option<Model<Project>>,
177        cx: &mut ModelContext<Self>,
178    ) -> Task<Result<()>> {
179        if !self.pending_invites.insert(called_user_id) {
180            return Task::ready(Err(anyhow!("user was already invited")));
181        }
182        cx.notify();
183
184        if self._join_debouncer.running() {
185            return Task::ready(Ok(()));
186        }
187
188        let room = if let Some(room) = self.room().cloned() {
189            Some(Task::ready(Ok(room)).shared())
190        } else {
191            self.pending_room_creation.clone()
192        };
193
194        let invite = if let Some(room) = room {
195            cx.spawn(move |_, mut cx| async move {
196                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
197
198                let initial_project_id = if let Some(initial_project) = initial_project {
199                    Some(
200                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
201                            .await?,
202                    )
203                } else {
204                    None
205                };
206
207                room.update(&mut cx, move |room, cx| {
208                    room.call(called_user_id, initial_project_id, cx)
209                })?
210                .await?;
211
212                anyhow::Ok(())
213            })
214        } else {
215            let client = self.client.clone();
216            let user_store = self.user_store.clone();
217            let room = cx
218                .spawn(move |this, mut cx| async move {
219                    let create_room = async {
220                        let room = cx
221                            .update(|cx| {
222                                Room::create(
223                                    called_user_id,
224                                    initial_project,
225                                    client,
226                                    user_store,
227                                    cx,
228                                )
229                            })?
230                            .await?;
231
232                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
233                            .await?;
234
235                        anyhow::Ok(room)
236                    };
237
238                    let room = create_room.await;
239                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
240                    room.map_err(Arc::new)
241                })
242                .shared();
243            self.pending_room_creation = Some(room.clone());
244            cx.background_executor().spawn(async move {
245                room.await.map_err(|err| anyhow!("{:?}", err))?;
246                anyhow::Ok(())
247            })
248        };
249
250        cx.spawn(move |this, mut cx| async move {
251            let result = invite.await;
252            if result.is_ok() {
253                this.update(&mut cx, |this, cx| {
254                    this.report_call_event("Participant Invited", cx)
255                })?;
256            } else {
257                //TODO: report collaboration error
258                log::error!("invite failed: {:?}", result);
259            }
260
261            this.update(&mut cx, |this, cx| {
262                this.pending_invites.remove(&called_user_id);
263                cx.notify();
264            })?;
265            result
266        })
267    }
268
269    pub fn cancel_invite(
270        &mut self,
271        called_user_id: u64,
272        cx: &mut ModelContext<Self>,
273    ) -> Task<Result<()>> {
274        let room_id = if let Some(room) = self.room() {
275            room.read(cx).id()
276        } else {
277            return Task::ready(Err(anyhow!("no active call")));
278        };
279
280        let client = self.client.clone();
281        cx.background_executor().spawn(async move {
282            client
283                .request(proto::CancelCall {
284                    room_id,
285                    called_user_id,
286                })
287                .await?;
288            anyhow::Ok(())
289        })
290    }
291
292    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
293        self.incoming_call.1.clone()
294    }
295
296    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
297        if self.room.is_some() {
298            return Task::ready(Err(anyhow!("cannot join while on another call")));
299        }
300
301        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
302            call
303        } else {
304            return Task::ready(Err(anyhow!("no incoming call")));
305        };
306
307        if self.pending_room_creation.is_some() {
308            return Task::ready(Ok(()));
309        }
310
311        let room_id = call.room_id;
312        let client = self.client.clone();
313        let user_store = self.user_store.clone();
314        let join = self
315            ._join_debouncer
316            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
317
318        cx.spawn(|this, mut cx| async move {
319            let room = join.await?;
320            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
321                .await?;
322            this.update(&mut cx, |this, cx| {
323                this.report_call_event("Incoming Call Accepted", cx)
324            })?;
325            Ok(())
326        })
327    }
328
329    pub fn decline_incoming(&mut self, _: &mut ModelContext<Self>) -> Result<()> {
330        let call = self
331            .incoming_call
332            .0
333            .borrow_mut()
334            .take()
335            .ok_or_else(|| anyhow!("no incoming call"))?;
336        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
337        self.client.send(proto::DeclineCall {
338            room_id: call.room_id,
339        })?;
340        Ok(())
341    }
342
343    pub fn join_channel(
344        &mut self,
345        channel_id: ChannelId,
346        cx: &mut ModelContext<Self>,
347    ) -> Task<Result<Option<Model<Room>>>> {
348        if let Some(room) = self.room().cloned() {
349            if room.read(cx).channel_id() == Some(channel_id) {
350                return Task::ready(Ok(Some(room)));
351            } else {
352                room.update(cx, |room, cx| room.clear_state(cx));
353            }
354        }
355
356        if self.pending_room_creation.is_some() {
357            return Task::ready(Ok(None));
358        }
359
360        let client = self.client.clone();
361        let user_store = self.user_store.clone();
362        let join = self._join_debouncer.spawn(cx, move |cx| async move {
363            Room::join_channel(channel_id, client, user_store, cx).await
364        });
365
366        cx.spawn(|this, mut cx| async move {
367            let room = join.await?;
368            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
369                .await?;
370            this.update(&mut cx, |this, cx| {
371                this.report_call_event("Channel Joined", cx)
372            })?;
373            Ok(room)
374        })
375    }
376
377    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
378        cx.notify();
379        self.report_call_event("Call Ended", cx);
380
381        Audio::end_call(cx);
382
383        let channel_id = self.channel_id(cx);
384        if let Some((room, _)) = self.room.take() {
385            cx.emit(Event::RoomLeft { channel_id });
386            room.update(cx, |room, cx| room.leave(cx))
387        } else {
388            Task::ready(Ok(()))
389        }
390    }
391
392    pub fn share_project(
393        &mut self,
394        project: Model<Project>,
395        cx: &mut ModelContext<Self>,
396    ) -> Task<Result<u64>> {
397        if let Some((room, _)) = self.room.as_ref() {
398            self.report_call_event("Project Shared", cx);
399            room.update(cx, |room, cx| room.share_project(project, cx))
400        } else {
401            Task::ready(Err(anyhow!("no active call")))
402        }
403    }
404
405    pub fn unshare_project(
406        &mut self,
407        project: Model<Project>,
408        cx: &mut ModelContext<Self>,
409    ) -> Result<()> {
410        if let Some((room, _)) = self.room.as_ref() {
411            self.report_call_event("Project Unshared", cx);
412            room.update(cx, |room, cx| room.unshare_project(project, cx))
413        } else {
414            Err(anyhow!("no active call"))
415        }
416    }
417
418    pub fn location(&self) -> Option<&WeakModel<Project>> {
419        self.location.as_ref()
420    }
421
422    pub fn set_location(
423        &mut self,
424        project: Option<&Model<Project>>,
425        cx: &mut ModelContext<Self>,
426    ) -> Task<Result<()>> {
427        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
428            self.location = project.map(|project| project.downgrade());
429            if let Some((room, _)) = self.room.as_ref() {
430                return room.update(cx, |room, cx| room.set_location(project, cx));
431            }
432        }
433        Task::ready(Ok(()))
434    }
435
436    fn set_room(
437        &mut self,
438        room: Option<Model<Room>>,
439        cx: &mut ModelContext<Self>,
440    ) -> Task<Result<()>> {
441        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
442            Task::ready(Ok(()))
443        } else {
444            cx.notify();
445            if let Some(room) = room {
446                if room.read(cx).status().is_offline() {
447                    self.room = None;
448                    Task::ready(Ok(()))
449                } else {
450                    let subscriptions = vec![
451                        cx.observe(&room, |this, room, cx| {
452                            if room.read(cx).status().is_offline() {
453                                this.set_room(None, cx).detach_and_log_err(cx);
454                            }
455
456                            cx.notify();
457                        }),
458                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
459                    ];
460                    self.room = Some((room.clone(), subscriptions));
461                    let location = self
462                        .location
463                        .as_ref()
464                        .and_then(|location| location.upgrade());
465                    let channel_id = room.read(cx).channel_id();
466                    cx.emit(Event::RoomJoined { channel_id });
467                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
468                }
469            } else {
470                self.room = None;
471                Task::ready(Ok(()))
472            }
473        }
474    }
475
476    pub fn room(&self) -> Option<&Model<Room>> {
477        self.room.as_ref().map(|(room, _)| room)
478    }
479
480    pub fn client(&self) -> Arc<Client> {
481        self.client.clone()
482    }
483
484    pub fn pending_invites(&self) -> &HashSet<u64> {
485        &self.pending_invites
486    }
487
488    pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
489        if let Some(room) = self.room() {
490            let room = room.read(cx);
491            telemetry::event!(
492                operation,
493                room_id = room.id(),
494                channel_id = room.channel_id()
495            )
496        }
497    }
498}
499
500#[cfg(test)]
501mod test {
502    use gpui::TestAppContext;
503
504    use crate::OneAtATime;
505
506    #[gpui::test]
507    async fn test_one_at_a_time(cx: &mut TestAppContext) {
508        let mut one_at_a_time = OneAtATime { cancel: None };
509
510        assert_eq!(
511            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
512                .await
513                .unwrap(),
514            Some(1)
515        );
516
517        let (a, b) = cx.update(|cx| {
518            (
519                one_at_a_time.spawn(cx, |_| async {
520                    panic!("");
521                }),
522                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
523            )
524        });
525
526        assert_eq!(a.await.unwrap(), None::<u32>);
527        assert_eq!(b.await.unwrap(), Some(3));
528
529        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
530        drop(one_at_a_time);
531
532        assert_eq!(promise.await.unwrap(), None);
533    }
534}