call2.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, bail, Result};
  6use async_trait::async_trait;
  7use audio::Audio;
  8use call_settings::CallSettings;
  9use client::{
 10    proto::{self, PeerId},
 11    Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
 12};
 13use collections::HashSet;
 14use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 15use gpui::{
 16    AppContext, AsyncAppContext, AsyncWindowContext, Context, EventEmitter, Model, ModelContext,
 17    Subscription, Task, View, ViewContext, WeakModel, WeakView,
 18};
 19pub use participant::ParticipantLocation;
 20use participant::RemoteParticipant;
 21use postage::watch;
 22use project::Project;
 23use room::Event;
 24pub use room::Room;
 25use settings::Settings;
 26use std::sync::Arc;
 27use util::ResultExt;
 28use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
 29
 30pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 31    CallSettings::register(cx);
 32
 33    let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
 34    cx.set_global(active_call);
 35}
 36
 37pub struct OneAtATime {
 38    cancel: Option<oneshot::Sender<()>>,
 39}
 40
 41impl OneAtATime {
 42    /// spawn a task in the given context.
 43    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 44    /// otherwise you'll see the result of the task.
 45    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 46    where
 47        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 48        Fut: Future<Output = Result<R>>,
 49        R: 'static,
 50    {
 51        let (tx, rx) = oneshot::channel();
 52        self.cancel.replace(tx);
 53        cx.spawn(|cx| async move {
 54            futures::select_biased! {
 55                _ = rx.fuse() => Ok(None),
 56                result = f(cx).fuse() => result.map(Some),
 57            }
 58        })
 59    }
 60
 61    fn running(&self) -> bool {
 62        self.cancel
 63            .as_ref()
 64            .is_some_and(|cancel| !cancel.is_canceled())
 65    }
 66}
 67
 68#[derive(Clone)]
 69pub struct IncomingCall {
 70    pub room_id: u64,
 71    pub calling_user: Arc<User>,
 72    pub participants: Vec<Arc<User>>,
 73    pub initial_project: Option<proto::ParticipantProject>,
 74}
 75
 76/// Singleton global maintaining the user's participation in a room across workspaces.
 77pub struct ActiveCall {
 78    room: Option<(Model<Room>, Vec<Subscription>)>,
 79    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 80    location: Option<WeakModel<Project>>,
 81    _join_debouncer: OneAtATime,
 82    pending_invites: HashSet<u64>,
 83    incoming_call: (
 84        watch::Sender<Option<IncomingCall>>,
 85        watch::Receiver<Option<IncomingCall>>,
 86    ),
 87    client: Arc<Client>,
 88    user_store: Model<UserStore>,
 89    _subscriptions: Vec<client::Subscription>,
 90}
 91
 92impl EventEmitter<Event> for ActiveCall {}
 93
 94impl ActiveCall {
 95    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
 96        Self {
 97            room: None,
 98            pending_room_creation: None,
 99            location: None,
100            pending_invites: Default::default(),
101            incoming_call: watch::channel(),
102            _join_debouncer: OneAtATime { cancel: None },
103            _subscriptions: vec![
104                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
105                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
106            ],
107            client,
108            user_store,
109        }
110    }
111
112    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
113        self.room()?.read(cx).channel_id()
114    }
115
116    async fn handle_incoming_call(
117        this: Model<Self>,
118        envelope: TypedEnvelope<proto::IncomingCall>,
119        _: Arc<Client>,
120        mut cx: AsyncAppContext,
121    ) -> Result<proto::Ack> {
122        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
123        let call = IncomingCall {
124            room_id: envelope.payload.room_id,
125            participants: user_store
126                .update(&mut cx, |user_store, cx| {
127                    user_store.get_users(envelope.payload.participant_user_ids, cx)
128                })?
129                .await?,
130            calling_user: user_store
131                .update(&mut cx, |user_store, cx| {
132                    user_store.get_user(envelope.payload.calling_user_id, cx)
133                })?
134                .await?,
135            initial_project: envelope.payload.initial_project,
136        };
137        this.update(&mut cx, |this, _| {
138            *this.incoming_call.0.borrow_mut() = Some(call);
139        })?;
140
141        Ok(proto::Ack {})
142    }
143
144    async fn handle_call_canceled(
145        this: Model<Self>,
146        envelope: TypedEnvelope<proto::CallCanceled>,
147        _: Arc<Client>,
148        mut cx: AsyncAppContext,
149    ) -> Result<()> {
150        this.update(&mut cx, |this, _| {
151            let mut incoming_call = this.incoming_call.0.borrow_mut();
152            if incoming_call
153                .as_ref()
154                .map_or(false, |call| call.room_id == envelope.payload.room_id)
155            {
156                incoming_call.take();
157            }
158        })?;
159        Ok(())
160    }
161
162    pub fn global(cx: &AppContext) -> Model<Self> {
163        cx.global::<Model<Self>>().clone()
164    }
165
166    pub fn invite(
167        &mut self,
168        called_user_id: u64,
169        initial_project: Option<Model<Project>>,
170        cx: &mut ModelContext<Self>,
171    ) -> Task<Result<()>> {
172        if !self.pending_invites.insert(called_user_id) {
173            return Task::ready(Err(anyhow!("user was already invited")));
174        }
175        cx.notify();
176
177        if self._join_debouncer.running() {
178            return Task::ready(Ok(()));
179        }
180
181        let room = if let Some(room) = self.room().cloned() {
182            Some(Task::ready(Ok(room)).shared())
183        } else {
184            self.pending_room_creation.clone()
185        };
186
187        let invite = if let Some(room) = room {
188            cx.spawn(move |_, mut cx| async move {
189                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
190
191                let initial_project_id = if let Some(initial_project) = initial_project {
192                    Some(
193                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
194                            .await?,
195                    )
196                } else {
197                    None
198                };
199
200                room.update(&mut cx, move |room, cx| {
201                    room.call(called_user_id, initial_project_id, cx)
202                })?
203                .await?;
204
205                anyhow::Ok(())
206            })
207        } else {
208            let client = self.client.clone();
209            let user_store = self.user_store.clone();
210            let room = cx
211                .spawn(move |this, mut cx| async move {
212                    let create_room = async {
213                        let room = cx
214                            .update(|cx| {
215                                Room::create(
216                                    called_user_id,
217                                    initial_project,
218                                    client,
219                                    user_store,
220                                    cx,
221                                )
222                            })?
223                            .await?;
224
225                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
226                            .await?;
227
228                        anyhow::Ok(room)
229                    };
230
231                    let room = create_room.await;
232                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
233                    room.map_err(Arc::new)
234                })
235                .shared();
236            self.pending_room_creation = Some(room.clone());
237            cx.background_executor().spawn(async move {
238                room.await.map_err(|err| anyhow!("{:?}", err))?;
239                anyhow::Ok(())
240            })
241        };
242
243        cx.spawn(move |this, mut cx| async move {
244            let result = invite.await;
245            if result.is_ok() {
246                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
247            } else {
248                // TODO: Resport collaboration error
249            }
250
251            this.update(&mut cx, |this, cx| {
252                this.pending_invites.remove(&called_user_id);
253                cx.notify();
254            })?;
255            result
256        })
257    }
258
259    pub fn cancel_invite(
260        &mut self,
261        called_user_id: u64,
262        cx: &mut ModelContext<Self>,
263    ) -> Task<Result<()>> {
264        let room_id = if let Some(room) = self.room() {
265            room.read(cx).id()
266        } else {
267            return Task::ready(Err(anyhow!("no active call")));
268        };
269
270        let client = self.client.clone();
271        cx.background_executor().spawn(async move {
272            client
273                .request(proto::CancelCall {
274                    room_id,
275                    called_user_id,
276                })
277                .await?;
278            anyhow::Ok(())
279        })
280    }
281
282    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
283        self.incoming_call.1.clone()
284    }
285
286    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
287        if self.room.is_some() {
288            return Task::ready(Err(anyhow!("cannot join while on another call")));
289        }
290
291        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
292            call
293        } else {
294            return Task::ready(Err(anyhow!("no incoming call")));
295        };
296
297        if self.pending_room_creation.is_some() {
298            return Task::ready(Ok(()));
299        }
300
301        let room_id = call.room_id.clone();
302        let client = self.client.clone();
303        let user_store = self.user_store.clone();
304        let join = self
305            ._join_debouncer
306            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
307
308        cx.spawn(|this, mut cx| async move {
309            let room = join.await?;
310            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
311                .await?;
312            this.update(&mut cx, |this, cx| {
313                this.report_call_event("accept incoming", cx)
314            })?;
315            Ok(())
316        })
317    }
318
319    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
320        let call = self
321            .incoming_call
322            .0
323            .borrow_mut()
324            .take()
325            .ok_or_else(|| anyhow!("no incoming call"))?;
326        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
327        self.client.send(proto::DeclineCall {
328            room_id: call.room_id,
329        })?;
330        Ok(())
331    }
332
333    pub fn join_channel(
334        &mut self,
335        channel_id: u64,
336        cx: &mut ModelContext<Self>,
337    ) -> Task<Result<Option<Model<Room>>>> {
338        if let Some(room) = self.room().cloned() {
339            if room.read(cx).channel_id() == Some(channel_id) {
340                return Task::ready(Ok(Some(room)));
341            } else {
342                room.update(cx, |room, cx| room.clear_state(cx));
343            }
344        }
345
346        if self.pending_room_creation.is_some() {
347            return Task::ready(Ok(None));
348        }
349
350        let client = self.client.clone();
351        let user_store = self.user_store.clone();
352        let join = self._join_debouncer.spawn(cx, move |cx| async move {
353            Room::join_channel(channel_id, client, user_store, cx).await
354        });
355
356        cx.spawn(|this, mut cx| async move {
357            let room = join.await?;
358            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
359                .await?;
360            this.update(&mut cx, |this, cx| {
361                this.report_call_event("join channel", cx)
362            })?;
363            Ok(room)
364        })
365    }
366
367    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
368        cx.notify();
369        self.report_call_event("hang up", cx);
370
371        Audio::end_call(cx);
372        if let Some((room, _)) = self.room.take() {
373            room.update(cx, |room, cx| room.leave(cx))
374        } else {
375            Task::ready(Ok(()))
376        }
377    }
378
379    pub fn share_project(
380        &mut self,
381        project: Model<Project>,
382        cx: &mut ModelContext<Self>,
383    ) -> Task<Result<u64>> {
384        if let Some((room, _)) = self.room.as_ref() {
385            self.report_call_event("share project", cx);
386            room.update(cx, |room, cx| room.share_project(project, cx))
387        } else {
388            Task::ready(Err(anyhow!("no active call")))
389        }
390    }
391
392    pub fn unshare_project(
393        &mut self,
394        project: Model<Project>,
395        cx: &mut ModelContext<Self>,
396    ) -> Result<()> {
397        if let Some((room, _)) = self.room.as_ref() {
398            self.report_call_event("unshare project", cx);
399            room.update(cx, |room, cx| room.unshare_project(project, cx))
400        } else {
401            Err(anyhow!("no active call"))
402        }
403    }
404
405    pub fn location(&self) -> Option<&WeakModel<Project>> {
406        self.location.as_ref()
407    }
408
409    pub fn set_location(
410        &mut self,
411        project: Option<&Model<Project>>,
412        cx: &mut ModelContext<Self>,
413    ) -> Task<Result<()>> {
414        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
415            self.location = project.map(|project| project.downgrade());
416            if let Some((room, _)) = self.room.as_ref() {
417                return room.update(cx, |room, cx| room.set_location(project, cx));
418            }
419        }
420        Task::ready(Ok(()))
421    }
422
423    fn set_room(
424        &mut self,
425        room: Option<Model<Room>>,
426        cx: &mut ModelContext<Self>,
427    ) -> Task<Result<()>> {
428        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
429            cx.notify();
430            if let Some(room) = room {
431                if room.read(cx).status().is_offline() {
432                    self.room = None;
433                    Task::ready(Ok(()))
434                } else {
435                    let subscriptions = vec![
436                        cx.observe(&room, |this, room, cx| {
437                            if room.read(cx).status().is_offline() {
438                                this.set_room(None, cx).detach_and_log_err(cx);
439                            }
440
441                            cx.notify();
442                        }),
443                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
444                    ];
445                    self.room = Some((room.clone(), subscriptions));
446                    let location = self
447                        .location
448                        .as_ref()
449                        .and_then(|location| location.upgrade());
450                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
451                }
452            } else {
453                self.room = None;
454                Task::ready(Ok(()))
455            }
456        } else {
457            Task::ready(Ok(()))
458        }
459    }
460
461    pub fn room(&self) -> Option<&Model<Room>> {
462        self.room.as_ref().map(|(room, _)| room)
463    }
464
465    pub fn client(&self) -> Arc<Client> {
466        self.client.clone()
467    }
468
469    pub fn pending_invites(&self) -> &HashSet<u64> {
470        &self.pending_invites
471    }
472
473    pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
474        if let Some(room) = self.room() {
475            let room = room.read(cx);
476            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
477        }
478    }
479}
480
481pub fn report_call_event_for_room(
482    operation: &'static str,
483    room_id: u64,
484    channel_id: Option<u64>,
485    client: &Arc<Client>,
486    cx: &mut AppContext,
487) {
488    let telemetry = client.telemetry();
489    let telemetry_settings = *TelemetrySettings::get_global(cx);
490
491    telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
492}
493
494pub fn report_call_event_for_channel(
495    operation: &'static str,
496    channel_id: u64,
497    client: &Arc<Client>,
498    cx: &AppContext,
499) {
500    let room = ActiveCall::global(cx).read(cx).room();
501
502    let telemetry = client.telemetry();
503
504    let telemetry_settings = *TelemetrySettings::get_global(cx);
505
506    telemetry.report_call_event(
507        telemetry_settings,
508        operation,
509        room.map(|r| r.read(cx).id()),
510        Some(channel_id),
511    )
512}
513
514pub struct Call {
515    active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
516    parent_workspace: WeakView<Workspace>,
517}
518
519impl Call {
520    pub fn new(
521        parent_workspace: WeakView<Workspace>,
522        cx: &mut ViewContext<'_, Workspace>,
523    ) -> Box<dyn CallHandler> {
524        let mut active_call = None;
525        if cx.has_global::<Model<ActiveCall>>() {
526            let call = cx.global::<Model<ActiveCall>>().clone();
527            let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
528            active_call = Some((call, subscriptions));
529        }
530        Box::new(Self {
531            active_call,
532            parent_workspace,
533        })
534    }
535    fn on_active_call_event(
536        workspace: &mut Workspace,
537        _: Model<ActiveCall>,
538        event: &room::Event,
539        cx: &mut ViewContext<Workspace>,
540    ) {
541        match event {
542            room::Event::ParticipantLocationChanged { participant_id }
543            | room::Event::RemoteVideoTracksChanged { participant_id } => {
544                workspace.leader_updated(*participant_id, cx);
545            }
546            _ => {}
547        }
548    }
549}
550
551#[async_trait(?Send)]
552impl CallHandler for Call {
553    fn peer_state(
554        &mut self,
555        leader_id: PeerId,
556        cx: &mut ViewContext<Workspace>,
557    ) -> Option<(bool, bool)> {
558        let (call, _) = self.active_call.as_ref()?;
559        let room = call.read(cx).room()?.read(cx);
560        let participant = room.remote_participant_for_peer_id(leader_id)?;
561
562        let leader_in_this_app;
563        let leader_in_this_project;
564        match participant.location {
565            ParticipantLocation::SharedProject { project_id } => {
566                leader_in_this_app = true;
567                leader_in_this_project = Some(project_id)
568                    == self
569                        .parent_workspace
570                        .update(cx, |this, cx| this.project().read(cx).remote_id())
571                        .log_err()
572                        .flatten();
573            }
574            ParticipantLocation::UnsharedProject => {
575                leader_in_this_app = true;
576                leader_in_this_project = false;
577            }
578            ParticipantLocation::External => {
579                leader_in_this_app = false;
580                leader_in_this_project = false;
581            }
582        };
583
584        Some((leader_in_this_project, leader_in_this_app))
585    }
586
587    fn shared_screen_for_peer(
588        &self,
589        peer_id: PeerId,
590        _pane: &View<Pane>,
591        cx: &mut ViewContext<Workspace>,
592    ) -> Option<Box<dyn ItemHandle>> {
593        let (call, _) = self.active_call.as_ref()?;
594        let room = call.read(cx).room()?.read(cx);
595        let participant = room.remote_participant_for_peer_id(peer_id)?;
596        let _track = participant.video_tracks.values().next()?.clone();
597        let _user = participant.user.clone();
598        todo!();
599        // for item in pane.read(cx).items_of_type::<SharedScreen>() {
600        //     if item.read(cx).peer_id == peer_id {
601        //         return Box::new(Some(item));
602        //     }
603        // }
604
605        // Some(Box::new(cx.build_view(|cx| {
606        //     SharedScreen::new(&track, peer_id, user.clone(), cx)
607        // })))
608    }
609    fn room_id(&self, cx: &AppContext) -> Option<u64> {
610        Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
611    }
612    fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
613        let Some((call, _)) = self.active_call.as_ref() else {
614            bail!("Cannot exit a call; not in a call");
615        };
616
617        call.update(&mut cx, |this, cx| this.hang_up(cx))
618    }
619    fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
620        ActiveCall::global(cx).read(cx).location().cloned()
621    }
622    fn invite(
623        &mut self,
624        called_user_id: u64,
625        initial_project: Option<Model<Project>>,
626        cx: &mut AppContext,
627    ) -> Task<Result<()>> {
628        ActiveCall::global(cx).update(cx, |this, cx| {
629            this.invite(called_user_id, initial_project, cx)
630        })
631    }
632    fn remote_participants(&self, cx: &AppContext) -> Option<Vec<Arc<User>>> {
633        self.active_call
634            .as_ref()
635            .map(|call| {
636                call.0.read(cx).room().map(|room| {
637                    room.read(cx)
638                        .remote_participants()
639                        .iter()
640                        .map(|participant| participant.1.user.clone())
641                        .collect()
642                })
643            })
644            .flatten()
645    }
646    fn is_muted(&self, cx: &AppContext) -> Option<bool> {
647        self.active_call
648            .as_ref()
649            .map(|call| {
650                call.0
651                    .read(cx)
652                    .room()
653                    .map(|room| room.read(cx).is_muted(cx))
654            })
655            .flatten()
656    }
657    fn toggle_mute(&self, cx: &mut AppContext) {
658        self.active_call.as_ref().map(|call| {
659            call.0.update(cx, |this, cx| {
660                this.room().map(|room| {
661                    room.update(cx, |this, cx| {
662                        this.toggle_mute(cx);
663                    })
664                })
665            })
666        });
667    }
668}
669
670#[cfg(test)]
671mod test {
672    use gpui::TestAppContext;
673
674    use crate::OneAtATime;
675
676    #[gpui::test]
677    async fn test_one_at_a_time(cx: &mut TestAppContext) {
678        let mut one_at_a_time = OneAtATime { cancel: None };
679
680        assert_eq!(
681            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
682                .await
683                .unwrap(),
684            Some(1)
685        );
686
687        let (a, b) = cx.update(|cx| {
688            (
689                one_at_a_time.spawn(cx, |_| async {
690                    assert!(false);
691                    Ok(2)
692                }),
693                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
694            )
695        });
696
697        assert_eq!(a.await.unwrap(), None);
698        assert_eq!(b.await.unwrap(), Some(3));
699
700        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
701        drop(one_at_a_time);
702
703        assert_eq!(promise.await.unwrap(), None);
704    }
705}