call2.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, bail, Result};
  6use async_trait::async_trait;
  7use audio::Audio;
  8use call_settings::CallSettings;
  9use client::{
 10    proto::{self, PeerId},
 11    Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
 12};
 13use collections::HashSet;
 14use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 15use gpui::{
 16    AppContext, AsyncAppContext, AsyncWindowContext, Context, EventEmitter, Model, ModelContext,
 17    Subscription, Task, View, ViewContext, WeakModel, WeakView,
 18};
 19pub use participant::ParticipantLocation;
 20use postage::watch;
 21use project::Project;
 22use room::Event;
 23pub use room::Room;
 24use settings::Settings;
 25use std::sync::Arc;
 26use util::ResultExt;
 27use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
 28
 29pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 30    CallSettings::register(cx);
 31
 32    let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
 33    cx.set_global(active_call);
 34}
 35
 36pub struct OneAtATime {
 37    cancel: Option<oneshot::Sender<()>>,
 38}
 39
 40impl OneAtATime {
 41    /// spawn a task in the given context.
 42    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 43    /// otherwise you'll see the result of the task.
 44    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 45    where
 46        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 47        Fut: Future<Output = Result<R>>,
 48        R: 'static,
 49    {
 50        let (tx, rx) = oneshot::channel();
 51        self.cancel.replace(tx);
 52        cx.spawn(|cx| async move {
 53            futures::select_biased! {
 54                _ = rx.fuse() => Ok(None),
 55                result = f(cx).fuse() => result.map(Some),
 56            }
 57        })
 58    }
 59
 60    fn running(&self) -> bool {
 61        self.cancel
 62            .as_ref()
 63            .is_some_and(|cancel| !cancel.is_canceled())
 64    }
 65}
 66
 67#[derive(Clone)]
 68pub struct IncomingCall {
 69    pub room_id: u64,
 70    pub calling_user: Arc<User>,
 71    pub participants: Vec<Arc<User>>,
 72    pub initial_project: Option<proto::ParticipantProject>,
 73}
 74
 75/// Singleton global maintaining the user's participation in a room across workspaces.
 76pub struct ActiveCall {
 77    room: Option<(Model<Room>, Vec<Subscription>)>,
 78    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 79    location: Option<WeakModel<Project>>,
 80    _join_debouncer: OneAtATime,
 81    pending_invites: HashSet<u64>,
 82    incoming_call: (
 83        watch::Sender<Option<IncomingCall>>,
 84        watch::Receiver<Option<IncomingCall>>,
 85    ),
 86    client: Arc<Client>,
 87    user_store: Model<UserStore>,
 88    _subscriptions: Vec<client::Subscription>,
 89}
 90
 91impl EventEmitter<Event> for ActiveCall {}
 92
 93impl ActiveCall {
 94    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
 95        Self {
 96            room: None,
 97            pending_room_creation: None,
 98            location: None,
 99            pending_invites: Default::default(),
100            incoming_call: watch::channel(),
101            _join_debouncer: OneAtATime { cancel: None },
102            _subscriptions: vec![
103                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
104                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
105            ],
106            client,
107            user_store,
108        }
109    }
110
111    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
112        self.room()?.read(cx).channel_id()
113    }
114
115    async fn handle_incoming_call(
116        this: Model<Self>,
117        envelope: TypedEnvelope<proto::IncomingCall>,
118        _: Arc<Client>,
119        mut cx: AsyncAppContext,
120    ) -> Result<proto::Ack> {
121        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
122        let call = IncomingCall {
123            room_id: envelope.payload.room_id,
124            participants: user_store
125                .update(&mut cx, |user_store, cx| {
126                    user_store.get_users(envelope.payload.participant_user_ids, cx)
127                })?
128                .await?,
129            calling_user: user_store
130                .update(&mut cx, |user_store, cx| {
131                    user_store.get_user(envelope.payload.calling_user_id, cx)
132                })?
133                .await?,
134            initial_project: envelope.payload.initial_project,
135        };
136        this.update(&mut cx, |this, _| {
137            *this.incoming_call.0.borrow_mut() = Some(call);
138        })?;
139
140        Ok(proto::Ack {})
141    }
142
143    async fn handle_call_canceled(
144        this: Model<Self>,
145        envelope: TypedEnvelope<proto::CallCanceled>,
146        _: Arc<Client>,
147        mut cx: AsyncAppContext,
148    ) -> Result<()> {
149        this.update(&mut cx, |this, _| {
150            let mut incoming_call = this.incoming_call.0.borrow_mut();
151            if incoming_call
152                .as_ref()
153                .map_or(false, |call| call.room_id == envelope.payload.room_id)
154            {
155                incoming_call.take();
156            }
157        })?;
158        Ok(())
159    }
160
161    pub fn global(cx: &AppContext) -> Model<Self> {
162        cx.global::<Model<Self>>().clone()
163    }
164
165    pub fn invite(
166        &mut self,
167        called_user_id: u64,
168        initial_project: Option<Model<Project>>,
169        cx: &mut ModelContext<Self>,
170    ) -> Task<Result<()>> {
171        if !self.pending_invites.insert(called_user_id) {
172            return Task::ready(Err(anyhow!("user was already invited")));
173        }
174        cx.notify();
175
176        if self._join_debouncer.running() {
177            return Task::ready(Ok(()));
178        }
179
180        let room = if let Some(room) = self.room().cloned() {
181            Some(Task::ready(Ok(room)).shared())
182        } else {
183            self.pending_room_creation.clone()
184        };
185
186        let invite = if let Some(room) = room {
187            cx.spawn(move |_, mut cx| async move {
188                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
189
190                let initial_project_id = if let Some(initial_project) = initial_project {
191                    Some(
192                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
193                            .await?,
194                    )
195                } else {
196                    None
197                };
198
199                room.update(&mut cx, move |room, cx| {
200                    room.call(called_user_id, initial_project_id, cx)
201                })?
202                .await?;
203
204                anyhow::Ok(())
205            })
206        } else {
207            let client = self.client.clone();
208            let user_store = self.user_store.clone();
209            let room = cx
210                .spawn(move |this, mut cx| async move {
211                    let create_room = async {
212                        let room = cx
213                            .update(|cx| {
214                                Room::create(
215                                    called_user_id,
216                                    initial_project,
217                                    client,
218                                    user_store,
219                                    cx,
220                                )
221                            })?
222                            .await?;
223
224                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
225                            .await?;
226
227                        anyhow::Ok(room)
228                    };
229
230                    let room = create_room.await;
231                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
232                    room.map_err(Arc::new)
233                })
234                .shared();
235            self.pending_room_creation = Some(room.clone());
236            cx.background_executor().spawn(async move {
237                room.await.map_err(|err| anyhow!("{:?}", err))?;
238                anyhow::Ok(())
239            })
240        };
241
242        cx.spawn(move |this, mut cx| async move {
243            let result = invite.await;
244            if result.is_ok() {
245                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
246            } else {
247                // TODO: Resport collaboration error
248            }
249
250            this.update(&mut cx, |this, cx| {
251                this.pending_invites.remove(&called_user_id);
252                cx.notify();
253            })?;
254            result
255        })
256    }
257
258    pub fn cancel_invite(
259        &mut self,
260        called_user_id: u64,
261        cx: &mut ModelContext<Self>,
262    ) -> Task<Result<()>> {
263        let room_id = if let Some(room) = self.room() {
264            room.read(cx).id()
265        } else {
266            return Task::ready(Err(anyhow!("no active call")));
267        };
268
269        let client = self.client.clone();
270        cx.background_executor().spawn(async move {
271            client
272                .request(proto::CancelCall {
273                    room_id,
274                    called_user_id,
275                })
276                .await?;
277            anyhow::Ok(())
278        })
279    }
280
281    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
282        self.incoming_call.1.clone()
283    }
284
285    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286        if self.room.is_some() {
287            return Task::ready(Err(anyhow!("cannot join while on another call")));
288        }
289
290        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
291            call
292        } else {
293            return Task::ready(Err(anyhow!("no incoming call")));
294        };
295
296        if self.pending_room_creation.is_some() {
297            return Task::ready(Ok(()));
298        }
299
300        let room_id = call.room_id.clone();
301        let client = self.client.clone();
302        let user_store = self.user_store.clone();
303        let join = self
304            ._join_debouncer
305            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
306
307        cx.spawn(|this, mut cx| async move {
308            let room = join.await?;
309            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
310                .await?;
311            this.update(&mut cx, |this, cx| {
312                this.report_call_event("accept incoming", cx)
313            })?;
314            Ok(())
315        })
316    }
317
318    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
319        let call = self
320            .incoming_call
321            .0
322            .borrow_mut()
323            .take()
324            .ok_or_else(|| anyhow!("no incoming call"))?;
325        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
326        self.client.send(proto::DeclineCall {
327            room_id: call.room_id,
328        })?;
329        Ok(())
330    }
331
332    pub fn join_channel(
333        &mut self,
334        channel_id: u64,
335        cx: &mut ModelContext<Self>,
336    ) -> Task<Result<Option<Model<Room>>>> {
337        if let Some(room) = self.room().cloned() {
338            if room.read(cx).channel_id() == Some(channel_id) {
339                return Task::ready(Ok(Some(room)));
340            } else {
341                room.update(cx, |room, cx| room.clear_state(cx));
342            }
343        }
344
345        if self.pending_room_creation.is_some() {
346            return Task::ready(Ok(None));
347        }
348
349        let client = self.client.clone();
350        let user_store = self.user_store.clone();
351        let join = self._join_debouncer.spawn(cx, move |cx| async move {
352            Room::join_channel(channel_id, client, user_store, cx).await
353        });
354
355        cx.spawn(|this, mut cx| async move {
356            let room = join.await?;
357            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
358                .await?;
359            this.update(&mut cx, |this, cx| {
360                this.report_call_event("join channel", cx)
361            })?;
362            Ok(room)
363        })
364    }
365
366    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
367        cx.notify();
368        self.report_call_event("hang up", cx);
369
370        Audio::end_call(cx);
371        if let Some((room, _)) = self.room.take() {
372            room.update(cx, |room, cx| room.leave(cx))
373        } else {
374            Task::ready(Ok(()))
375        }
376    }
377
378    pub fn share_project(
379        &mut self,
380        project: Model<Project>,
381        cx: &mut ModelContext<Self>,
382    ) -> Task<Result<u64>> {
383        if let Some((room, _)) = self.room.as_ref() {
384            self.report_call_event("share project", cx);
385            room.update(cx, |room, cx| room.share_project(project, cx))
386        } else {
387            Task::ready(Err(anyhow!("no active call")))
388        }
389    }
390
391    pub fn unshare_project(
392        &mut self,
393        project: Model<Project>,
394        cx: &mut ModelContext<Self>,
395    ) -> Result<()> {
396        if let Some((room, _)) = self.room.as_ref() {
397            self.report_call_event("unshare project", cx);
398            room.update(cx, |room, cx| room.unshare_project(project, cx))
399        } else {
400            Err(anyhow!("no active call"))
401        }
402    }
403
404    pub fn location(&self) -> Option<&WeakModel<Project>> {
405        self.location.as_ref()
406    }
407
408    pub fn set_location(
409        &mut self,
410        project: Option<&Model<Project>>,
411        cx: &mut ModelContext<Self>,
412    ) -> Task<Result<()>> {
413        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
414            self.location = project.map(|project| project.downgrade());
415            if let Some((room, _)) = self.room.as_ref() {
416                return room.update(cx, |room, cx| room.set_location(project, cx));
417            }
418        }
419        Task::ready(Ok(()))
420    }
421
422    fn set_room(
423        &mut self,
424        room: Option<Model<Room>>,
425        cx: &mut ModelContext<Self>,
426    ) -> Task<Result<()>> {
427        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
428            cx.notify();
429            if let Some(room) = room {
430                if room.read(cx).status().is_offline() {
431                    self.room = None;
432                    Task::ready(Ok(()))
433                } else {
434                    let subscriptions = vec![
435                        cx.observe(&room, |this, room, cx| {
436                            if room.read(cx).status().is_offline() {
437                                this.set_room(None, cx).detach_and_log_err(cx);
438                            }
439
440                            cx.notify();
441                        }),
442                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
443                    ];
444                    self.room = Some((room.clone(), subscriptions));
445                    let location = self
446                        .location
447                        .as_ref()
448                        .and_then(|location| location.upgrade());
449                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
450                }
451            } else {
452                self.room = None;
453                Task::ready(Ok(()))
454            }
455        } else {
456            Task::ready(Ok(()))
457        }
458    }
459
460    pub fn room(&self) -> Option<&Model<Room>> {
461        self.room.as_ref().map(|(room, _)| room)
462    }
463
464    pub fn client(&self) -> Arc<Client> {
465        self.client.clone()
466    }
467
468    pub fn pending_invites(&self) -> &HashSet<u64> {
469        &self.pending_invites
470    }
471
472    pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
473        if let Some(room) = self.room() {
474            let room = room.read(cx);
475            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
476        }
477    }
478}
479
480pub fn report_call_event_for_room(
481    operation: &'static str,
482    room_id: u64,
483    channel_id: Option<u64>,
484    client: &Arc<Client>,
485    cx: &mut AppContext,
486) {
487    let telemetry = client.telemetry();
488    let telemetry_settings = *TelemetrySettings::get_global(cx);
489
490    telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
491}
492
493pub fn report_call_event_for_channel(
494    operation: &'static str,
495    channel_id: u64,
496    client: &Arc<Client>,
497    cx: &AppContext,
498) {
499    let room = ActiveCall::global(cx).read(cx).room();
500
501    let telemetry = client.telemetry();
502
503    let telemetry_settings = *TelemetrySettings::get_global(cx);
504
505    telemetry.report_call_event(
506        telemetry_settings,
507        operation,
508        room.map(|r| r.read(cx).id()),
509        Some(channel_id),
510    )
511}
512
513pub struct Call {
514    active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
515    parent_workspace: WeakView<Workspace>,
516}
517
518impl Call {
519    pub fn new(
520        parent_workspace: WeakView<Workspace>,
521        cx: &mut ViewContext<'_, Workspace>,
522    ) -> Box<dyn CallHandler> {
523        let mut active_call = None;
524        if cx.has_global::<Model<ActiveCall>>() {
525            let call = cx.global::<Model<ActiveCall>>().clone();
526            let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
527            active_call = Some((call, subscriptions));
528        }
529        Box::new(Self {
530            active_call,
531            parent_workspace,
532        })
533    }
534    fn on_active_call_event(
535        workspace: &mut Workspace,
536        _: Model<ActiveCall>,
537        event: &room::Event,
538        cx: &mut ViewContext<Workspace>,
539    ) {
540        match event {
541            room::Event::ParticipantLocationChanged { participant_id }
542            | room::Event::RemoteVideoTracksChanged { participant_id } => {
543                workspace.leader_updated(*participant_id, cx);
544            }
545            _ => {}
546        }
547    }
548}
549
550#[async_trait(?Send)]
551impl CallHandler for Call {
552    fn shared_screen_for_peer(
553        &self,
554        peer_id: PeerId,
555        _pane: &View<Pane>,
556        cx: &mut ViewContext<Workspace>,
557    ) -> Option<Box<dyn ItemHandle>> {
558        let (call, _) = self.active_call.as_ref()?;
559        let room = call.read(cx).room()?.read(cx);
560        let participant = room.remote_participant_for_peer_id(peer_id)?;
561        let _track = participant.video_tracks.values().next()?.clone();
562        let _user = participant.user.clone();
563        todo!();
564        // for item in pane.read(cx).items_of_type::<SharedScreen>() {
565        //     if item.read(cx).peer_id == peer_id {
566        //         return Box::new(Some(item));
567        //     }
568        // }
569
570        // Some(Box::new(cx.build_view(|cx| {
571        //     SharedScreen::new(&track, peer_id, user.clone(), cx)
572        // })))
573    }
574
575    fn room_id(&self, cx: &AppContext) -> Option<u64> {
576        Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
577    }
578    fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
579        let Some((call, _)) = self.active_call.as_ref() else {
580            bail!("Cannot exit a call; not in a call");
581        };
582
583        call.update(&mut cx, |this, cx| this.hang_up(cx))
584    }
585    fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
586        ActiveCall::global(cx).read(cx).location().cloned()
587    }
588    fn peer_state(
589        &mut self,
590        leader_id: PeerId,
591        cx: &mut ViewContext<Workspace>,
592    ) -> Option<(bool, bool)> {
593        let (call, _) = self.active_call.as_ref()?;
594        let room = call.read(cx).room()?.read(cx);
595        let participant = room.remote_participant_for_peer_id(leader_id)?;
596
597        let leader_in_this_app;
598        let leader_in_this_project;
599        match participant.location {
600            ParticipantLocation::SharedProject { project_id } => {
601                leader_in_this_app = true;
602                leader_in_this_project = Some(project_id)
603                    == self
604                        .parent_workspace
605                        .update(cx, |this, cx| this.project().read(cx).remote_id())
606                        .log_err()
607                        .flatten();
608            }
609            ParticipantLocation::UnsharedProject => {
610                leader_in_this_app = true;
611                leader_in_this_project = false;
612            }
613            ParticipantLocation::External => {
614                leader_in_this_app = false;
615                leader_in_this_project = false;
616            }
617        };
618
619        Some((leader_in_this_project, leader_in_this_app))
620    }
621}
622
623#[cfg(test)]
624mod test {
625    use gpui::TestAppContext;
626
627    use crate::OneAtATime;
628
629    #[gpui::test]
630    async fn test_one_at_a_time(cx: &mut TestAppContext) {
631        let mut one_at_a_time = OneAtATime { cancel: None };
632
633        assert_eq!(
634            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
635                .await
636                .unwrap(),
637            Some(1)
638        );
639
640        let (a, b) = cx.update(|cx| {
641            (
642                one_at_a_time.spawn(cx, |_| async {
643                    assert!(false);
644                    Ok(2)
645                }),
646                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
647            )
648        });
649
650        assert_eq!(a.await.unwrap(), None);
651        assert_eq!(b.await.unwrap(), Some(3));
652
653        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
654        drop(one_at_a_time);
655
656        assert_eq!(promise.await.unwrap(), None);
657    }
658}