call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use call_settings::CallSettings;
  8use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
  9use collections::HashSet;
 10use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 11use gpui::{
 12    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, Subscription,
 13    Task, WeakModel,
 14};
 15use postage::watch;
 16use project::Project;
 17use room::Event;
 18use settings::Settings;
 19use std::sync::Arc;
 20
 21#[cfg(not(target_os = "windows"))]
 22pub use live_kit_client::play_remote_video_track;
 23pub use live_kit_client::{
 24    track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
 25};
 26pub use participant::ParticipantLocation;
 27pub use room::Room;
 28
 29struct GlobalActiveCall(Model<ActiveCall>);
 30
 31impl Global for GlobalActiveCall {}
 32
 33pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 34    live_kit_client::init(
 35        cx.background_executor().dispatcher.clone(),
 36        cx.http_client(),
 37    );
 38    CallSettings::register(cx);
 39
 40    let active_call = cx.new_model(|cx| ActiveCall::new(client, user_store, cx));
 41    cx.set_global(GlobalActiveCall(active_call));
 42}
 43
 44pub struct OneAtATime {
 45    cancel: Option<oneshot::Sender<()>>,
 46}
 47
 48impl OneAtATime {
 49    /// spawn a task in the given context.
 50    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 51    /// otherwise you'll see the result of the task.
 52    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 53    where
 54        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 55        Fut: Future<Output = Result<R>>,
 56        R: 'static,
 57    {
 58        let (tx, rx) = oneshot::channel();
 59        self.cancel.replace(tx);
 60        cx.spawn(|cx| async move {
 61            futures::select_biased! {
 62                _ = rx.fuse() => Ok(None),
 63                result = f(cx).fuse() => result.map(Some),
 64            }
 65        })
 66    }
 67
 68    fn running(&self) -> bool {
 69        self.cancel
 70            .as_ref()
 71            .is_some_and(|cancel| !cancel.is_canceled())
 72    }
 73}
 74
 75#[derive(Clone)]
 76pub struct IncomingCall {
 77    pub room_id: u64,
 78    pub calling_user: Arc<User>,
 79    pub participants: Vec<Arc<User>>,
 80    pub initial_project: Option<proto::ParticipantProject>,
 81}
 82
 83/// Singleton global maintaining the user's participation in a room across workspaces.
 84pub struct ActiveCall {
 85    room: Option<(Model<Room>, Vec<Subscription>)>,
 86    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 87    location: Option<WeakModel<Project>>,
 88    _join_debouncer: OneAtATime,
 89    pending_invites: HashSet<u64>,
 90    incoming_call: (
 91        watch::Sender<Option<IncomingCall>>,
 92        watch::Receiver<Option<IncomingCall>>,
 93    ),
 94    client: Arc<Client>,
 95    user_store: Model<UserStore>,
 96    _subscriptions: Vec<client::Subscription>,
 97}
 98
 99impl EventEmitter<Event> for ActiveCall {}
100
101impl ActiveCall {
102    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
103        Self {
104            room: None,
105            pending_room_creation: None,
106            location: None,
107            pending_invites: Default::default(),
108            incoming_call: watch::channel(),
109            _join_debouncer: OneAtATime { cancel: None },
110            _subscriptions: vec![
111                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
112                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
113            ],
114            client,
115            user_store,
116        }
117    }
118
119    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
120        self.room()?.read(cx).channel_id()
121    }
122
123    async fn handle_incoming_call(
124        this: Model<Self>,
125        envelope: TypedEnvelope<proto::IncomingCall>,
126        mut cx: AsyncAppContext,
127    ) -> Result<proto::Ack> {
128        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
129        let call = IncomingCall {
130            room_id: envelope.payload.room_id,
131            participants: user_store
132                .update(&mut cx, |user_store, cx| {
133                    user_store.get_users(envelope.payload.participant_user_ids, cx)
134                })?
135                .await?,
136            calling_user: user_store
137                .update(&mut cx, |user_store, cx| {
138                    user_store.get_user(envelope.payload.calling_user_id, cx)
139                })?
140                .await?,
141            initial_project: envelope.payload.initial_project,
142        };
143        this.update(&mut cx, |this, _| {
144            *this.incoming_call.0.borrow_mut() = Some(call);
145        })?;
146
147        Ok(proto::Ack {})
148    }
149
150    async fn handle_call_canceled(
151        this: Model<Self>,
152        envelope: TypedEnvelope<proto::CallCanceled>,
153        mut cx: AsyncAppContext,
154    ) -> Result<()> {
155        this.update(&mut cx, |this, _| {
156            let mut incoming_call = this.incoming_call.0.borrow_mut();
157            if incoming_call
158                .as_ref()
159                .map_or(false, |call| call.room_id == envelope.payload.room_id)
160            {
161                incoming_call.take();
162            }
163        })?;
164        Ok(())
165    }
166
167    pub fn global(cx: &AppContext) -> Model<Self> {
168        cx.global::<GlobalActiveCall>().0.clone()
169    }
170
171    pub fn try_global(cx: &AppContext) -> Option<Model<Self>> {
172        cx.try_global::<GlobalActiveCall>()
173            .map(|call| call.0.clone())
174    }
175
176    pub fn invite(
177        &mut self,
178        called_user_id: u64,
179        initial_project: Option<Model<Project>>,
180        cx: &mut ModelContext<Self>,
181    ) -> Task<Result<()>> {
182        if !self.pending_invites.insert(called_user_id) {
183            return Task::ready(Err(anyhow!("user was already invited")));
184        }
185        cx.notify();
186
187        if self._join_debouncer.running() {
188            return Task::ready(Ok(()));
189        }
190
191        let room = if let Some(room) = self.room().cloned() {
192            Some(Task::ready(Ok(room)).shared())
193        } else {
194            self.pending_room_creation.clone()
195        };
196
197        let invite = if let Some(room) = room {
198            cx.spawn(move |_, mut cx| async move {
199                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
200
201                let initial_project_id = if let Some(initial_project) = initial_project {
202                    Some(
203                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
204                            .await?,
205                    )
206                } else {
207                    None
208                };
209
210                room.update(&mut cx, move |room, cx| {
211                    room.call(called_user_id, initial_project_id, cx)
212                })?
213                .await?;
214
215                anyhow::Ok(())
216            })
217        } else {
218            let client = self.client.clone();
219            let user_store = self.user_store.clone();
220            let room = cx
221                .spawn(move |this, mut cx| async move {
222                    let create_room = async {
223                        let room = cx
224                            .update(|cx| {
225                                Room::create(
226                                    called_user_id,
227                                    initial_project,
228                                    client,
229                                    user_store,
230                                    cx,
231                                )
232                            })?
233                            .await?;
234
235                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
236                            .await?;
237
238                        anyhow::Ok(room)
239                    };
240
241                    let room = create_room.await;
242                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
243                    room.map_err(Arc::new)
244                })
245                .shared();
246            self.pending_room_creation = Some(room.clone());
247            cx.background_executor().spawn(async move {
248                room.await.map_err(|err| anyhow!("{:?}", err))?;
249                anyhow::Ok(())
250            })
251        };
252
253        cx.spawn(move |this, mut cx| async move {
254            let result = invite.await;
255            if result.is_ok() {
256                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
257            } else {
258                //TODO: report collaboration error
259                log::error!("invite failed: {:?}", result);
260            }
261
262            this.update(&mut cx, |this, cx| {
263                this.pending_invites.remove(&called_user_id);
264                cx.notify();
265            })?;
266            result
267        })
268    }
269
270    pub fn cancel_invite(
271        &mut self,
272        called_user_id: u64,
273        cx: &mut ModelContext<Self>,
274    ) -> Task<Result<()>> {
275        let room_id = if let Some(room) = self.room() {
276            room.read(cx).id()
277        } else {
278            return Task::ready(Err(anyhow!("no active call")));
279        };
280
281        let client = self.client.clone();
282        cx.background_executor().spawn(async move {
283            client
284                .request(proto::CancelCall {
285                    room_id,
286                    called_user_id,
287                })
288                .await?;
289            anyhow::Ok(())
290        })
291    }
292
293    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
294        self.incoming_call.1.clone()
295    }
296
297    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
298        if self.room.is_some() {
299            return Task::ready(Err(anyhow!("cannot join while on another call")));
300        }
301
302        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
303            call
304        } else {
305            return Task::ready(Err(anyhow!("no incoming call")));
306        };
307
308        if self.pending_room_creation.is_some() {
309            return Task::ready(Ok(()));
310        }
311
312        let room_id = call.room_id;
313        let client = self.client.clone();
314        let user_store = self.user_store.clone();
315        let join = self
316            ._join_debouncer
317            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
318
319        cx.spawn(|this, mut cx| async move {
320            let room = join.await?;
321            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
322                .await?;
323            this.update(&mut cx, |this, cx| {
324                this.report_call_event("accept incoming", cx)
325            })?;
326            Ok(())
327        })
328    }
329
330    pub fn decline_incoming(&mut self, _: &mut ModelContext<Self>) -> Result<()> {
331        let call = self
332            .incoming_call
333            .0
334            .borrow_mut()
335            .take()
336            .ok_or_else(|| anyhow!("no incoming call"))?;
337        report_call_event_for_room("decline incoming", call.room_id, None, &self.client);
338        self.client.send(proto::DeclineCall {
339            room_id: call.room_id,
340        })?;
341        Ok(())
342    }
343
344    pub fn join_channel(
345        &mut self,
346        channel_id: ChannelId,
347        cx: &mut ModelContext<Self>,
348    ) -> Task<Result<Option<Model<Room>>>> {
349        if let Some(room) = self.room().cloned() {
350            if room.read(cx).channel_id() == Some(channel_id) {
351                return Task::ready(Ok(Some(room)));
352            } else {
353                room.update(cx, |room, cx| room.clear_state(cx));
354            }
355        }
356
357        if self.pending_room_creation.is_some() {
358            return Task::ready(Ok(None));
359        }
360
361        let client = self.client.clone();
362        let user_store = self.user_store.clone();
363        let join = self._join_debouncer.spawn(cx, move |cx| async move {
364            Room::join_channel(channel_id, client, user_store, cx).await
365        });
366
367        cx.spawn(|this, mut cx| async move {
368            let room = join.await?;
369            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
370                .await?;
371            this.update(&mut cx, |this, cx| {
372                this.report_call_event("join channel", cx)
373            })?;
374            Ok(room)
375        })
376    }
377
378    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
379        cx.notify();
380        self.report_call_event("hang up", cx);
381
382        Audio::end_call(cx);
383
384        let channel_id = self.channel_id(cx);
385        if let Some((room, _)) = self.room.take() {
386            cx.emit(Event::RoomLeft { channel_id });
387            room.update(cx, |room, cx| room.leave(cx))
388        } else {
389            Task::ready(Ok(()))
390        }
391    }
392
393    pub fn share_project(
394        &mut self,
395        project: Model<Project>,
396        cx: &mut ModelContext<Self>,
397    ) -> Task<Result<u64>> {
398        if let Some((room, _)) = self.room.as_ref() {
399            self.report_call_event("share project", cx);
400            room.update(cx, |room, cx| room.share_project(project, cx))
401        } else {
402            Task::ready(Err(anyhow!("no active call")))
403        }
404    }
405
406    pub fn unshare_project(
407        &mut self,
408        project: Model<Project>,
409        cx: &mut ModelContext<Self>,
410    ) -> Result<()> {
411        if let Some((room, _)) = self.room.as_ref() {
412            self.report_call_event("unshare project", cx);
413            room.update(cx, |room, cx| room.unshare_project(project, cx))
414        } else {
415            Err(anyhow!("no active call"))
416        }
417    }
418
419    pub fn location(&self) -> Option<&WeakModel<Project>> {
420        self.location.as_ref()
421    }
422
423    pub fn set_location(
424        &mut self,
425        project: Option<&Model<Project>>,
426        cx: &mut ModelContext<Self>,
427    ) -> Task<Result<()>> {
428        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
429            self.location = project.map(|project| project.downgrade());
430            if let Some((room, _)) = self.room.as_ref() {
431                return room.update(cx, |room, cx| room.set_location(project, cx));
432            }
433        }
434        Task::ready(Ok(()))
435    }
436
437    fn set_room(
438        &mut self,
439        room: Option<Model<Room>>,
440        cx: &mut ModelContext<Self>,
441    ) -> Task<Result<()>> {
442        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
443            Task::ready(Ok(()))
444        } else {
445            cx.notify();
446            if let Some(room) = room {
447                if room.read(cx).status().is_offline() {
448                    self.room = None;
449                    Task::ready(Ok(()))
450                } else {
451                    let subscriptions = vec![
452                        cx.observe(&room, |this, room, cx| {
453                            if room.read(cx).status().is_offline() {
454                                this.set_room(None, cx).detach_and_log_err(cx);
455                            }
456
457                            cx.notify();
458                        }),
459                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
460                    ];
461                    self.room = Some((room.clone(), subscriptions));
462                    let location = self
463                        .location
464                        .as_ref()
465                        .and_then(|location| location.upgrade());
466                    let channel_id = room.read(cx).channel_id();
467                    cx.emit(Event::RoomJoined { channel_id });
468                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
469                }
470            } else {
471                self.room = None;
472                Task::ready(Ok(()))
473            }
474        }
475    }
476
477    pub fn room(&self) -> Option<&Model<Room>> {
478        self.room.as_ref().map(|(room, _)| room)
479    }
480
481    pub fn client(&self) -> Arc<Client> {
482        self.client.clone()
483    }
484
485    pub fn pending_invites(&self) -> &HashSet<u64> {
486        &self.pending_invites
487    }
488
489    pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
490        if let Some(room) = self.room() {
491            let room = room.read(cx);
492            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client);
493        }
494    }
495}
496
497pub fn report_call_event_for_room(
498    operation: &'static str,
499    room_id: u64,
500    channel_id: Option<ChannelId>,
501    client: &Arc<Client>,
502) {
503    let telemetry = client.telemetry();
504
505    telemetry.report_call_event(operation, Some(room_id), channel_id)
506}
507
508pub fn report_call_event_for_channel(
509    operation: &'static str,
510    channel_id: ChannelId,
511    client: &Arc<Client>,
512    cx: &AppContext,
513) {
514    let room = ActiveCall::global(cx).read(cx).room();
515
516    let telemetry = client.telemetry();
517
518    telemetry.report_call_event(operation, room.map(|r| r.read(cx).id()), Some(channel_id))
519}
520
521#[cfg(test)]
522mod test {
523    use gpui::TestAppContext;
524
525    use crate::OneAtATime;
526
527    #[gpui::test]
528    async fn test_one_at_a_time(cx: &mut TestAppContext) {
529        let mut one_at_a_time = OneAtATime { cancel: None };
530
531        assert_eq!(
532            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
533                .await
534                .unwrap(),
535            Some(1)
536        );
537
538        let (a, b) = cx.update(|cx| {
539            (
540                one_at_a_time.spawn(cx, |_| async {
541                    panic!("");
542                }),
543                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
544            )
545        });
546
547        assert_eq!(a.await.unwrap(), None::<u32>);
548        assert_eq!(b.await.unwrap(), Some(3));
549
550        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
551        drop(one_at_a_time);
552
553        assert_eq!(promise.await.unwrap(), None);
554    }
555}