call2.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4mod shared_screen;
  5
  6use anyhow::{anyhow, bail, Result};
  7use async_trait::async_trait;
  8use audio::Audio;
  9use call_settings::CallSettings;
 10use client::{
 11    proto::{self, PeerId},
 12    Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
 13};
 14use collections::HashSet;
 15use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 16use gpui::{
 17    AppContext, AsyncAppContext, AsyncWindowContext, Context, EventEmitter, Model, ModelContext,
 18    Subscription, Task, View, ViewContext, VisualContext, WeakModel, WeakView,
 19};
 20pub use participant::ParticipantLocation;
 21use participant::RemoteParticipant;
 22use postage::watch;
 23use project::Project;
 24use room::Event;
 25pub use room::Room;
 26use settings::Settings;
 27use shared_screen::SharedScreen;
 28use std::sync::Arc;
 29use util::ResultExt;
 30use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
 31
 32pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 33    CallSettings::register(cx);
 34
 35    let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
 36    cx.set_global(active_call);
 37}
 38
 39pub struct OneAtATime {
 40    cancel: Option<oneshot::Sender<()>>,
 41}
 42
 43impl OneAtATime {
 44    /// spawn a task in the given context.
 45    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 46    /// otherwise you'll see the result of the task.
 47    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 48    where
 49        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 50        Fut: Future<Output = Result<R>>,
 51        R: 'static,
 52    {
 53        let (tx, rx) = oneshot::channel();
 54        self.cancel.replace(tx);
 55        cx.spawn(|cx| async move {
 56            futures::select_biased! {
 57                _ = rx.fuse() => Ok(None),
 58                result = f(cx).fuse() => result.map(Some),
 59            }
 60        })
 61    }
 62
 63    fn running(&self) -> bool {
 64        self.cancel
 65            .as_ref()
 66            .is_some_and(|cancel| !cancel.is_canceled())
 67    }
 68}
 69
 70#[derive(Clone)]
 71pub struct IncomingCall {
 72    pub room_id: u64,
 73    pub calling_user: Arc<User>,
 74    pub participants: Vec<Arc<User>>,
 75    pub initial_project: Option<proto::ParticipantProject>,
 76}
 77
 78/// Singleton global maintaining the user's participation in a room across workspaces.
 79pub struct ActiveCall {
 80    room: Option<(Model<Room>, Vec<Subscription>)>,
 81    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 82    location: Option<WeakModel<Project>>,
 83    _join_debouncer: OneAtATime,
 84    pending_invites: HashSet<u64>,
 85    incoming_call: (
 86        watch::Sender<Option<IncomingCall>>,
 87        watch::Receiver<Option<IncomingCall>>,
 88    ),
 89    client: Arc<Client>,
 90    user_store: Model<UserStore>,
 91    _subscriptions: Vec<client::Subscription>,
 92}
 93
 94impl EventEmitter<Event> for ActiveCall {}
 95
 96impl ActiveCall {
 97    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
 98        Self {
 99            room: None,
100            pending_room_creation: None,
101            location: None,
102            pending_invites: Default::default(),
103            incoming_call: watch::channel(),
104            _join_debouncer: OneAtATime { cancel: None },
105            _subscriptions: vec![
106                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
107                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
108            ],
109            client,
110            user_store,
111        }
112    }
113
114    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
115        self.room()?.read(cx).channel_id()
116    }
117
118    async fn handle_incoming_call(
119        this: Model<Self>,
120        envelope: TypedEnvelope<proto::IncomingCall>,
121        _: Arc<Client>,
122        mut cx: AsyncAppContext,
123    ) -> Result<proto::Ack> {
124        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
125        let call = IncomingCall {
126            room_id: envelope.payload.room_id,
127            participants: user_store
128                .update(&mut cx, |user_store, cx| {
129                    user_store.get_users(envelope.payload.participant_user_ids, cx)
130                })?
131                .await?,
132            calling_user: user_store
133                .update(&mut cx, |user_store, cx| {
134                    user_store.get_user(envelope.payload.calling_user_id, cx)
135                })?
136                .await?,
137            initial_project: envelope.payload.initial_project,
138        };
139        this.update(&mut cx, |this, _| {
140            *this.incoming_call.0.borrow_mut() = Some(call);
141        })?;
142
143        Ok(proto::Ack {})
144    }
145
146    async fn handle_call_canceled(
147        this: Model<Self>,
148        envelope: TypedEnvelope<proto::CallCanceled>,
149        _: Arc<Client>,
150        mut cx: AsyncAppContext,
151    ) -> Result<()> {
152        this.update(&mut cx, |this, _| {
153            let mut incoming_call = this.incoming_call.0.borrow_mut();
154            if incoming_call
155                .as_ref()
156                .map_or(false, |call| call.room_id == envelope.payload.room_id)
157            {
158                incoming_call.take();
159            }
160        })?;
161        Ok(())
162    }
163
164    pub fn global(cx: &AppContext) -> Model<Self> {
165        cx.global::<Model<Self>>().clone()
166    }
167
168    pub fn invite(
169        &mut self,
170        called_user_id: u64,
171        initial_project: Option<Model<Project>>,
172        cx: &mut ModelContext<Self>,
173    ) -> Task<Result<()>> {
174        if !self.pending_invites.insert(called_user_id) {
175            return Task::ready(Err(anyhow!("user was already invited")));
176        }
177        cx.notify();
178
179        if self._join_debouncer.running() {
180            return Task::ready(Ok(()));
181        }
182
183        let room = if let Some(room) = self.room().cloned() {
184            Some(Task::ready(Ok(room)).shared())
185        } else {
186            self.pending_room_creation.clone()
187        };
188
189        let invite = if let Some(room) = room {
190            cx.spawn(move |_, mut cx| async move {
191                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
192
193                let initial_project_id = if let Some(initial_project) = initial_project {
194                    Some(
195                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
196                            .await?,
197                    )
198                } else {
199                    None
200                };
201
202                room.update(&mut cx, move |room, cx| {
203                    room.call(called_user_id, initial_project_id, cx)
204                })?
205                .await?;
206
207                anyhow::Ok(())
208            })
209        } else {
210            let client = self.client.clone();
211            let user_store = self.user_store.clone();
212            let room = cx
213                .spawn(move |this, mut cx| async move {
214                    let create_room = async {
215                        let room = cx
216                            .update(|cx| {
217                                Room::create(
218                                    called_user_id,
219                                    initial_project,
220                                    client,
221                                    user_store,
222                                    cx,
223                                )
224                            })?
225                            .await?;
226
227                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
228                            .await?;
229
230                        anyhow::Ok(room)
231                    };
232
233                    let room = create_room.await;
234                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
235                    room.map_err(Arc::new)
236                })
237                .shared();
238            self.pending_room_creation = Some(room.clone());
239            cx.background_executor().spawn(async move {
240                room.await.map_err(|err| anyhow!("{:?}", err))?;
241                anyhow::Ok(())
242            })
243        };
244
245        cx.spawn(move |this, mut cx| async move {
246            let result = invite.await;
247            if result.is_ok() {
248                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
249            } else {
250                // TODO: Resport collaboration error
251            }
252
253            this.update(&mut cx, |this, cx| {
254                this.pending_invites.remove(&called_user_id);
255                cx.notify();
256            })?;
257            result
258        })
259    }
260
261    pub fn cancel_invite(
262        &mut self,
263        called_user_id: u64,
264        cx: &mut ModelContext<Self>,
265    ) -> Task<Result<()>> {
266        let room_id = if let Some(room) = self.room() {
267            room.read(cx).id()
268        } else {
269            return Task::ready(Err(anyhow!("no active call")));
270        };
271
272        let client = self.client.clone();
273        cx.background_executor().spawn(async move {
274            client
275                .request(proto::CancelCall {
276                    room_id,
277                    called_user_id,
278                })
279                .await?;
280            anyhow::Ok(())
281        })
282    }
283
284    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
285        self.incoming_call.1.clone()
286    }
287
288    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
289        if self.room.is_some() {
290            return Task::ready(Err(anyhow!("cannot join while on another call")));
291        }
292
293        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
294            call
295        } else {
296            return Task::ready(Err(anyhow!("no incoming call")));
297        };
298
299        if self.pending_room_creation.is_some() {
300            return Task::ready(Ok(()));
301        }
302
303        let room_id = call.room_id.clone();
304        let client = self.client.clone();
305        let user_store = self.user_store.clone();
306        let join = self
307            ._join_debouncer
308            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
309
310        cx.spawn(|this, mut cx| async move {
311            let room = join.await?;
312            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
313                .await?;
314            this.update(&mut cx, |this, cx| {
315                this.report_call_event("accept incoming", cx)
316            })?;
317            Ok(())
318        })
319    }
320
321    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
322        let call = self
323            .incoming_call
324            .0
325            .borrow_mut()
326            .take()
327            .ok_or_else(|| anyhow!("no incoming call"))?;
328        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
329        self.client.send(proto::DeclineCall {
330            room_id: call.room_id,
331        })?;
332        Ok(())
333    }
334
335    pub fn join_channel(
336        &mut self,
337        channel_id: u64,
338        cx: &mut ModelContext<Self>,
339    ) -> Task<Result<Option<Model<Room>>>> {
340        if let Some(room) = self.room().cloned() {
341            if room.read(cx).channel_id() == Some(channel_id) {
342                return Task::ready(Ok(Some(room)));
343            } else {
344                room.update(cx, |room, cx| room.clear_state(cx));
345            }
346        }
347
348        if self.pending_room_creation.is_some() {
349            return Task::ready(Ok(None));
350        }
351
352        let client = self.client.clone();
353        let user_store = self.user_store.clone();
354        let join = self._join_debouncer.spawn(cx, move |cx| async move {
355            Room::join_channel(channel_id, client, user_store, cx).await
356        });
357
358        cx.spawn(|this, mut cx| async move {
359            let room = join.await?;
360            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
361                .await?;
362            this.update(&mut cx, |this, cx| {
363                this.report_call_event("join channel", cx)
364            })?;
365            Ok(room)
366        })
367    }
368
369    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
370        cx.notify();
371        self.report_call_event("hang up", cx);
372
373        Audio::end_call(cx);
374        if let Some((room, _)) = self.room.take() {
375            room.update(cx, |room, cx| room.leave(cx))
376        } else {
377            Task::ready(Ok(()))
378        }
379    }
380
381    pub fn share_project(
382        &mut self,
383        project: Model<Project>,
384        cx: &mut ModelContext<Self>,
385    ) -> Task<Result<u64>> {
386        if let Some((room, _)) = self.room.as_ref() {
387            self.report_call_event("share project", cx);
388            room.update(cx, |room, cx| room.share_project(project, cx))
389        } else {
390            Task::ready(Err(anyhow!("no active call")))
391        }
392    }
393
394    pub fn unshare_project(
395        &mut self,
396        project: Model<Project>,
397        cx: &mut ModelContext<Self>,
398    ) -> Result<()> {
399        if let Some((room, _)) = self.room.as_ref() {
400            self.report_call_event("unshare project", cx);
401            room.update(cx, |room, cx| room.unshare_project(project, cx))
402        } else {
403            Err(anyhow!("no active call"))
404        }
405    }
406
407    pub fn location(&self) -> Option<&WeakModel<Project>> {
408        self.location.as_ref()
409    }
410
411    pub fn set_location(
412        &mut self,
413        project: Option<&Model<Project>>,
414        cx: &mut ModelContext<Self>,
415    ) -> Task<Result<()>> {
416        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
417            self.location = project.map(|project| project.downgrade());
418            if let Some((room, _)) = self.room.as_ref() {
419                return room.update(cx, |room, cx| room.set_location(project, cx));
420            }
421        }
422        Task::ready(Ok(()))
423    }
424
425    fn set_room(
426        &mut self,
427        room: Option<Model<Room>>,
428        cx: &mut ModelContext<Self>,
429    ) -> Task<Result<()>> {
430        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
431            cx.notify();
432            if let Some(room) = room {
433                if room.read(cx).status().is_offline() {
434                    self.room = None;
435                    Task::ready(Ok(()))
436                } else {
437                    let subscriptions = vec![
438                        cx.observe(&room, |this, room, cx| {
439                            if room.read(cx).status().is_offline() {
440                                this.set_room(None, cx).detach_and_log_err(cx);
441                            }
442
443                            cx.notify();
444                        }),
445                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
446                    ];
447                    self.room = Some((room.clone(), subscriptions));
448                    let location = self
449                        .location
450                        .as_ref()
451                        .and_then(|location| location.upgrade());
452                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
453                }
454            } else {
455                self.room = None;
456                Task::ready(Ok(()))
457            }
458        } else {
459            Task::ready(Ok(()))
460        }
461    }
462
463    pub fn room(&self) -> Option<&Model<Room>> {
464        self.room.as_ref().map(|(room, _)| room)
465    }
466
467    pub fn client(&self) -> Arc<Client> {
468        self.client.clone()
469    }
470
471    pub fn pending_invites(&self) -> &HashSet<u64> {
472        &self.pending_invites
473    }
474
475    pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
476        if let Some(room) = self.room() {
477            let room = room.read(cx);
478            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
479        }
480    }
481}
482
483pub fn report_call_event_for_room(
484    operation: &'static str,
485    room_id: u64,
486    channel_id: Option<u64>,
487    client: &Arc<Client>,
488    cx: &mut AppContext,
489) {
490    let telemetry = client.telemetry();
491    let telemetry_settings = *TelemetrySettings::get_global(cx);
492
493    telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
494}
495
496pub fn report_call_event_for_channel(
497    operation: &'static str,
498    channel_id: u64,
499    client: &Arc<Client>,
500    cx: &AppContext,
501) {
502    let room = ActiveCall::global(cx).read(cx).room();
503
504    let telemetry = client.telemetry();
505
506    let telemetry_settings = *TelemetrySettings::get_global(cx);
507
508    telemetry.report_call_event(
509        telemetry_settings,
510        operation,
511        room.map(|r| r.read(cx).id()),
512        Some(channel_id),
513    )
514}
515
516pub struct Call {
517    active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
518    parent_workspace: WeakView<Workspace>,
519}
520
521impl Call {
522    pub fn new(
523        parent_workspace: WeakView<Workspace>,
524        cx: &mut ViewContext<'_, Workspace>,
525    ) -> Box<dyn CallHandler> {
526        let mut active_call = None;
527        if cx.has_global::<Model<ActiveCall>>() {
528            let call = cx.global::<Model<ActiveCall>>().clone();
529            let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
530            active_call = Some((call, subscriptions));
531        }
532        Box::new(Self {
533            active_call,
534            parent_workspace,
535        })
536    }
537    fn on_active_call_event(
538        workspace: &mut Workspace,
539        _: Model<ActiveCall>,
540        event: &room::Event,
541        cx: &mut ViewContext<Workspace>,
542    ) {
543        match event {
544            room::Event::ParticipantLocationChanged { participant_id }
545            | room::Event::RemoteVideoTracksChanged { participant_id } => {
546                workspace.leader_updated(*participant_id, cx);
547            }
548            _ => {}
549        }
550    }
551}
552
553#[async_trait(?Send)]
554impl CallHandler for Call {
555    fn peer_state(
556        &mut self,
557        leader_id: PeerId,
558        cx: &mut ViewContext<Workspace>,
559    ) -> Option<(bool, bool)> {
560        let (call, _) = self.active_call.as_ref()?;
561        let room = call.read(cx).room()?.read(cx);
562        let participant = room.remote_participant_for_peer_id(leader_id)?;
563
564        let leader_in_this_app;
565        let leader_in_this_project;
566        match participant.location {
567            ParticipantLocation::SharedProject { project_id } => {
568                leader_in_this_app = true;
569                leader_in_this_project = Some(project_id)
570                    == self
571                        .parent_workspace
572                        .update(cx, |this, cx| this.project().read(cx).remote_id())
573                        .log_err()
574                        .flatten();
575            }
576            ParticipantLocation::UnsharedProject => {
577                leader_in_this_app = true;
578                leader_in_this_project = false;
579            }
580            ParticipantLocation::External => {
581                leader_in_this_app = false;
582                leader_in_this_project = false;
583            }
584        };
585
586        Some((leader_in_this_project, leader_in_this_app))
587    }
588
589    fn shared_screen_for_peer(
590        &self,
591        peer_id: PeerId,
592        pane: &View<Pane>,
593        cx: &mut ViewContext<Workspace>,
594    ) -> Option<Box<dyn ItemHandle>> {
595        let (call, _) = self.active_call.as_ref()?;
596        dbg!("A");
597        let room = call.read(cx).room()?.read(cx);
598        dbg!("B");
599        let participant = room.remote_participant_for_peer_id(peer_id)?;
600        dbg!("C");
601        let track = participant.video_tracks.values().next()?.clone();
602        dbg!("D");
603        let user = participant.user.clone();
604        for item in pane.read(cx).items_of_type::<SharedScreen>() {
605            if item.read(cx).peer_id == peer_id {
606                return Some(Box::new(item));
607            }
608        }
609
610        Some(Box::new(cx.build_view(|cx| {
611            SharedScreen::new(&track, peer_id, user.clone(), cx)
612        })))
613    }
614    fn room_id(&self, cx: &AppContext) -> Option<u64> {
615        Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
616    }
617    fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
618        let Some((call, _)) = self.active_call.as_ref() else {
619            bail!("Cannot exit a call; not in a call");
620        };
621
622        call.update(&mut cx, |this, cx| this.hang_up(cx))
623    }
624    fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
625        ActiveCall::global(cx).read(cx).location().cloned()
626    }
627    fn invite(
628        &mut self,
629        called_user_id: u64,
630        initial_project: Option<Model<Project>>,
631        cx: &mut AppContext,
632    ) -> Task<Result<()>> {
633        ActiveCall::global(cx).update(cx, |this, cx| {
634            this.invite(called_user_id, initial_project, cx)
635        })
636    }
637    fn remote_participants(&self, cx: &AppContext) -> Option<Vec<(Arc<User>, PeerId)>> {
638        self.active_call
639            .as_ref()
640            .map(|call| {
641                call.0.read(cx).room().map(|room| {
642                    room.read(cx)
643                        .remote_participants()
644                        .iter()
645                        .map(|participant| {
646                            (participant.1.user.clone(), participant.1.peer_id.clone())
647                        })
648                        .collect()
649                })
650            })
651            .flatten()
652    }
653    fn is_muted(&self, cx: &AppContext) -> Option<bool> {
654        self.active_call
655            .as_ref()
656            .map(|call| {
657                call.0
658                    .read(cx)
659                    .room()
660                    .map(|room| room.read(cx).is_muted(cx))
661            })
662            .flatten()
663    }
664    fn toggle_mute(&self, cx: &mut AppContext) {
665        self.active_call.as_ref().map(|call| {
666            call.0.update(cx, |this, cx| {
667                this.room().map(|room| {
668                    room.update(cx, |this, cx| {
669                        this.toggle_mute(cx);
670                    })
671                })
672            })
673        });
674    }
675    fn toggle_screen_share(&self, cx: &mut AppContext) {
676        self.active_call.as_ref().map(|call| {
677            call.0.update(cx, |this, cx| {
678                this.room().map(|room| {
679                    room.update(cx, |this, cx| {
680                        if this.is_screen_sharing() {
681                            dbg!("Unsharing");
682                            this.unshare_screen(cx);
683                        } else {
684                            dbg!("Sharing");
685                            let t = this.share_screen(cx);
686                            cx.spawn(move |_, cx| async move {
687                                t.await.log_err();
688                            })
689                            .detach();
690                        }
691                    })
692                })
693            })
694        });
695    }
696}
697
698#[cfg(test)]
699mod test {
700    use gpui::TestAppContext;
701
702    use crate::OneAtATime;
703
704    #[gpui::test]
705    async fn test_one_at_a_time(cx: &mut TestAppContext) {
706        let mut one_at_a_time = OneAtATime { cancel: None };
707
708        assert_eq!(
709            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
710                .await
711                .unwrap(),
712            Some(1)
713        );
714
715        let (a, b) = cx.update(|cx| {
716            (
717                one_at_a_time.spawn(cx, |_| async {
718                    assert!(false);
719                    Ok(2)
720                }),
721                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
722            )
723        });
724
725        assert_eq!(a.await.unwrap(), None);
726        assert_eq!(b.await.unwrap(), Some(3));
727
728        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
729        drop(one_at_a_time);
730
731        assert_eq!(promise.await.unwrap(), None);
732    }
733}