call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use std::sync::Arc;
  6
  7use anyhow::{anyhow, Result};
  8use audio::Audio;
  9use call_settings::CallSettings;
 10use channel::ChannelId;
 11use client::{
 12    proto::{self, PeerId},
 13    ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
 14};
 15use collections::HashSet;
 16use futures::{future::Shared, FutureExt};
 17use postage::watch;
 18
 19use gpui::{
 20    AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext,
 21    ModelHandle, Subscription, Task, ViewContext, WeakModelHandle,
 22};
 23use project::Project;
 24
 25pub use participant::ParticipantLocation;
 26pub use room::Room;
 27use util::ResultExt;
 28
 29pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 30    settings::register::<CallSettings>(cx);
 31
 32    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 33    cx.set_global(active_call);
 34}
 35
 36#[derive(Clone)]
 37pub struct IncomingCall {
 38    pub room_id: u64,
 39    pub calling_user: Arc<User>,
 40    pub participants: Vec<Arc<User>>,
 41    pub initial_project: Option<proto::ParticipantProject>,
 42}
 43
 44/// Singleton global maintaining the user's participation in a room across workspaces.
 45pub struct ActiveCall {
 46    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 47    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 48    location: Option<WeakModelHandle<Project>>,
 49    pending_invites: HashSet<u64>,
 50    incoming_call: (
 51        watch::Sender<Option<IncomingCall>>,
 52        watch::Receiver<Option<IncomingCall>>,
 53    ),
 54    client: Arc<Client>,
 55    user_store: ModelHandle<UserStore>,
 56    follow_handlers: Vec<FollowHandler>,
 57    followers: Vec<Follower>,
 58    _subscriptions: Vec<client::Subscription>,
 59}
 60
 61#[derive(PartialEq, Eq, PartialOrd, Ord)]
 62struct Follower {
 63    project_id: Option<u64>,
 64    peer_id: PeerId,
 65}
 66
 67struct FollowHandler {
 68    project_id: Option<u64>,
 69    root_view: AnyWeakViewHandle,
 70    get_views:
 71        Box<dyn Fn(&AnyViewHandle, Option<u64>, &mut AppContext) -> Option<proto::FollowResponse>>,
 72    update_view: Box<dyn Fn(&AnyViewHandle, PeerId, proto::UpdateFollowers, &mut AppContext)>,
 73}
 74
 75impl Entity for ActiveCall {
 76    type Event = room::Event;
 77}
 78
 79impl ActiveCall {
 80    fn new(
 81        client: Arc<Client>,
 82        user_store: ModelHandle<UserStore>,
 83        cx: &mut ModelContext<Self>,
 84    ) -> Self {
 85        Self {
 86            room: None,
 87            pending_room_creation: None,
 88            location: None,
 89            pending_invites: Default::default(),
 90            incoming_call: watch::channel(),
 91            follow_handlers: Default::default(),
 92            followers: Default::default(),
 93            _subscriptions: vec![
 94                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 95                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 96                client.add_request_handler(cx.handle(), Self::handle_follow),
 97                client.add_message_handler(cx.handle(), Self::handle_unfollow),
 98                client.add_message_handler(cx.handle(), Self::handle_update_followers),
 99            ],
100            client,
101            user_store,
102        }
103    }
104
105    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
106        self.room()?.read(cx).channel_id()
107    }
108
109    pub fn add_follow_handler<V: gpui::View, GetViews, UpdateView>(
110        &mut self,
111        root_view: gpui::ViewHandle<V>,
112        project_id: Option<u64>,
113        get_views: GetViews,
114        update_view: UpdateView,
115        _cx: &mut ModelContext<Self>,
116    ) where
117        GetViews: 'static
118            + Fn(&mut V, Option<u64>, &mut gpui::ViewContext<V>) -> Result<proto::FollowResponse>,
119        UpdateView:
120            'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext<V>) -> Result<()>,
121    {
122        self.follow_handlers
123            .retain(|h| h.root_view.id() != root_view.id());
124        if let Err(ix) = self
125            .follow_handlers
126            .binary_search_by_key(&(project_id, root_view.id()), |f| {
127                (f.project_id, f.root_view.id())
128            })
129        {
130            self.follow_handlers.insert(
131                ix,
132                FollowHandler {
133                    project_id,
134                    root_view: root_view.into_any().downgrade(),
135                    get_views: Box::new(move |view, project_id, cx| {
136                        let view = view.clone().downcast::<V>().unwrap();
137                        view.update(cx, |view, cx| get_views(view, project_id, cx).log_err())
138                            .flatten()
139                    }),
140                    update_view: Box::new(move |view, leader_id, message, cx| {
141                        let view = view.clone().downcast::<V>().unwrap();
142                        view.update(cx, |view, cx| {
143                            update_view(view, leader_id, message, cx).log_err()
144                        });
145                    }),
146                },
147            );
148        }
149    }
150
151    async fn handle_incoming_call(
152        this: ModelHandle<Self>,
153        envelope: TypedEnvelope<proto::IncomingCall>,
154        _: Arc<Client>,
155        mut cx: AsyncAppContext,
156    ) -> Result<proto::Ack> {
157        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
158        let call = IncomingCall {
159            room_id: envelope.payload.room_id,
160            participants: user_store
161                .update(&mut cx, |user_store, cx| {
162                    user_store.get_users(envelope.payload.participant_user_ids, cx)
163                })
164                .await?,
165            calling_user: user_store
166                .update(&mut cx, |user_store, cx| {
167                    user_store.get_user(envelope.payload.calling_user_id, cx)
168                })
169                .await?,
170            initial_project: envelope.payload.initial_project,
171        };
172        this.update(&mut cx, |this, _| {
173            *this.incoming_call.0.borrow_mut() = Some(call);
174        });
175
176        Ok(proto::Ack {})
177    }
178
179    async fn handle_call_canceled(
180        this: ModelHandle<Self>,
181        envelope: TypedEnvelope<proto::CallCanceled>,
182        _: Arc<Client>,
183        mut cx: AsyncAppContext,
184    ) -> Result<()> {
185        this.update(&mut cx, |this, _| {
186            let mut incoming_call = this.incoming_call.0.borrow_mut();
187            if incoming_call
188                .as_ref()
189                .map_or(false, |call| call.room_id == envelope.payload.room_id)
190            {
191                incoming_call.take();
192            }
193        });
194        Ok(())
195    }
196
197    async fn handle_follow(
198        this: ModelHandle<Self>,
199        envelope: TypedEnvelope<proto::Follow>,
200        _: Arc<Client>,
201        mut cx: AsyncAppContext,
202    ) -> Result<proto::FollowResponse> {
203        this.update(&mut cx, |this, cx| {
204            let follower = Follower {
205                project_id: envelope.payload.project_id,
206                peer_id: envelope.original_sender_id()?,
207            };
208            let active_project_id = this
209                .location
210                .as_ref()
211                .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
212
213            let mut response = proto::FollowResponse::default();
214            for handler in &this.follow_handlers {
215                if follower.project_id != handler.project_id && follower.project_id.is_some() {
216                    continue;
217                }
218
219                let Some(root_view) = handler.root_view.upgrade(cx) else {
220                    continue;
221                };
222
223                let Some(handler_response) =
224                    (handler.get_views)(&root_view, follower.project_id, cx)
225                else {
226                    continue;
227                };
228
229                if response.views.is_empty() {
230                    response.views = handler_response.views;
231                } else {
232                    response.views.extend_from_slice(&handler_response.views);
233                }
234
235                if let Some(active_view_id) = handler_response.active_view_id.clone() {
236                    if response.active_view_id.is_none() || handler.project_id == active_project_id
237                    {
238                        response.active_view_id = Some(active_view_id);
239                    }
240                }
241            }
242
243            if let Err(ix) = this.followers.binary_search(&follower) {
244                this.followers.insert(ix, follower);
245            }
246
247            Ok(response)
248        })
249    }
250
251    async fn handle_unfollow(
252        this: ModelHandle<Self>,
253        envelope: TypedEnvelope<proto::Unfollow>,
254        _: Arc<Client>,
255        mut cx: AsyncAppContext,
256    ) -> Result<()> {
257        this.update(&mut cx, |this, _| {
258            let follower = Follower {
259                project_id: envelope.payload.project_id,
260                peer_id: envelope.original_sender_id()?,
261            };
262            if let Err(ix) = this.followers.binary_search(&follower) {
263                this.followers.remove(ix);
264            }
265            Ok(())
266        })
267    }
268
269    async fn handle_update_followers(
270        this: ModelHandle<Self>,
271        envelope: TypedEnvelope<proto::UpdateFollowers>,
272        _: Arc<Client>,
273        mut cx: AsyncAppContext,
274    ) -> Result<()> {
275        let leader_id = envelope.original_sender_id()?;
276        let update = envelope.payload;
277        this.update(&mut cx, |this, cx| {
278            for handler in &this.follow_handlers {
279                if update.project_id != handler.project_id && update.project_id.is_some() {
280                    continue;
281                }
282                let Some(root_view) = handler.root_view.upgrade(cx) else {
283                    continue;
284                };
285                (handler.update_view)(&root_view, leader_id, update.clone(), cx);
286            }
287            Ok(())
288        })
289    }
290
291    pub fn update_followers(
292        &self,
293        project_id: Option<u64>,
294        update: proto::update_followers::Variant,
295        cx: &AppContext,
296    ) -> Option<()> {
297        let room_id = self.room()?.read(cx).id();
298        let follower_ids: Vec<_> = self
299            .followers
300            .iter()
301            .filter_map(|follower| {
302                (follower.project_id == project_id).then_some(follower.peer_id.into())
303            })
304            .collect();
305        if follower_ids.is_empty() {
306            return None;
307        }
308        self.client
309            .send(proto::UpdateFollowers {
310                room_id,
311                project_id,
312                follower_ids,
313                variant: Some(update),
314            })
315            .log_err()
316    }
317
318    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
319        cx.global::<ModelHandle<Self>>().clone()
320    }
321
322    pub fn invite(
323        &mut self,
324        called_user_id: u64,
325        initial_project: Option<ModelHandle<Project>>,
326        cx: &mut ModelContext<Self>,
327    ) -> Task<Result<()>> {
328        if !self.pending_invites.insert(called_user_id) {
329            return Task::ready(Err(anyhow!("user was already invited")));
330        }
331        cx.notify();
332
333        let room = if let Some(room) = self.room().cloned() {
334            Some(Task::ready(Ok(room)).shared())
335        } else {
336            self.pending_room_creation.clone()
337        };
338
339        let invite = if let Some(room) = room {
340            cx.spawn_weak(|_, mut cx| async move {
341                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
342
343                let initial_project_id = if let Some(initial_project) = initial_project {
344                    Some(
345                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
346                            .await?,
347                    )
348                } else {
349                    None
350                };
351
352                room.update(&mut cx, |room, cx| {
353                    room.call(called_user_id, initial_project_id, cx)
354                })
355                .await?;
356
357                anyhow::Ok(())
358            })
359        } else {
360            let client = self.client.clone();
361            let user_store = self.user_store.clone();
362            let room = cx
363                .spawn(|this, mut cx| async move {
364                    let create_room = async {
365                        let room = cx
366                            .update(|cx| {
367                                Room::create(
368                                    called_user_id,
369                                    initial_project,
370                                    client,
371                                    user_store,
372                                    cx,
373                                )
374                            })
375                            .await?;
376
377                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
378                            .await?;
379
380                        anyhow::Ok(room)
381                    };
382
383                    let room = create_room.await;
384                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
385                    room.map_err(Arc::new)
386                })
387                .shared();
388            self.pending_room_creation = Some(room.clone());
389            cx.foreground().spawn(async move {
390                room.await.map_err(|err| anyhow!("{:?}", err))?;
391                anyhow::Ok(())
392            })
393        };
394
395        cx.spawn(|this, mut cx| async move {
396            let result = invite.await;
397            this.update(&mut cx, |this, cx| {
398                this.pending_invites.remove(&called_user_id);
399                this.report_call_event("invite", cx);
400                cx.notify();
401            });
402            result
403        })
404    }
405
406    pub fn cancel_invite(
407        &mut self,
408        called_user_id: u64,
409        cx: &mut ModelContext<Self>,
410    ) -> Task<Result<()>> {
411        let room_id = if let Some(room) = self.room() {
412            room.read(cx).id()
413        } else {
414            return Task::ready(Err(anyhow!("no active call")));
415        };
416
417        let client = self.client.clone();
418        cx.foreground().spawn(async move {
419            client
420                .request(proto::CancelCall {
421                    room_id,
422                    called_user_id,
423                })
424                .await?;
425            anyhow::Ok(())
426        })
427    }
428
429    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
430        self.incoming_call.1.clone()
431    }
432
433    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
434        if self.room.is_some() {
435            return Task::ready(Err(anyhow!("cannot join while on another call")));
436        }
437
438        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
439            call
440        } else {
441            return Task::ready(Err(anyhow!("no incoming call")));
442        };
443
444        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
445
446        cx.spawn(|this, mut cx| async move {
447            let room = join.await?;
448            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
449                .await?;
450            this.update(&mut cx, |this, cx| {
451                this.report_call_event("accept incoming", cx)
452            });
453            Ok(())
454        })
455    }
456
457    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
458        let call = self
459            .incoming_call
460            .0
461            .borrow_mut()
462            .take()
463            .ok_or_else(|| anyhow!("no incoming call"))?;
464        Self::report_call_event_for_room(
465            "decline incoming",
466            Some(call.room_id),
467            None,
468            &self.client,
469            cx,
470        );
471        self.client.send(proto::DeclineCall {
472            room_id: call.room_id,
473        })?;
474        Ok(())
475    }
476
477    pub fn join_channel(
478        &mut self,
479        channel_id: u64,
480        cx: &mut ModelContext<Self>,
481    ) -> Task<Result<()>> {
482        if let Some(room) = self.room().cloned() {
483            if room.read(cx).channel_id() == Some(channel_id) {
484                return Task::ready(Ok(()));
485            } else {
486                room.update(cx, |room, cx| room.clear_state(cx));
487            }
488        }
489
490        let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
491
492        cx.spawn(|this, mut cx| async move {
493            let room = join.await?;
494            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
495                .await?;
496            this.update(&mut cx, |this, cx| {
497                this.report_call_event("join channel", cx)
498            });
499            Ok(())
500        })
501    }
502
503    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
504        cx.notify();
505        self.report_call_event("hang up", cx);
506        Audio::end_call(cx);
507        if let Some((room, _)) = self.room.take() {
508            room.update(cx, |room, cx| room.leave(cx))
509        } else {
510            Task::ready(Ok(()))
511        }
512    }
513
514    pub fn share_project(
515        &mut self,
516        project: ModelHandle<Project>,
517        cx: &mut ModelContext<Self>,
518    ) -> Task<Result<u64>> {
519        if let Some((room, _)) = self.room.as_ref() {
520            self.report_call_event("share project", cx);
521            room.update(cx, |room, cx| room.share_project(project, cx))
522        } else {
523            Task::ready(Err(anyhow!("no active call")))
524        }
525    }
526
527    pub fn unshare_project(
528        &mut self,
529        project: ModelHandle<Project>,
530        cx: &mut ModelContext<Self>,
531    ) -> Result<()> {
532        if let Some((room, _)) = self.room.as_ref() {
533            self.report_call_event("unshare project", cx);
534            room.update(cx, |room, cx| room.unshare_project(project, cx))
535        } else {
536            Err(anyhow!("no active call"))
537        }
538    }
539
540    pub fn set_location(
541        &mut self,
542        project: Option<&ModelHandle<Project>>,
543        cx: &mut ModelContext<Self>,
544    ) -> Task<Result<()>> {
545        self.location = project.map(|project| project.downgrade());
546        if let Some((room, _)) = self.room.as_ref() {
547            room.update(cx, |room, cx| room.set_location(project, cx))
548        } else {
549            Task::ready(Ok(()))
550        }
551    }
552
553    fn set_room(
554        &mut self,
555        room: Option<ModelHandle<Room>>,
556        cx: &mut ModelContext<Self>,
557    ) -> Task<Result<()>> {
558        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
559            cx.notify();
560            if let Some(room) = room {
561                if room.read(cx).status().is_offline() {
562                    self.room = None;
563                    Task::ready(Ok(()))
564                } else {
565                    let subscriptions = vec![
566                        cx.observe(&room, |this, room, cx| {
567                            if room.read(cx).status().is_offline() {
568                                this.set_room(None, cx).detach_and_log_err(cx);
569                            }
570
571                            cx.notify();
572                        }),
573                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
574                    ];
575                    self.room = Some((room.clone(), subscriptions));
576                    let location = self.location.and_then(|location| location.upgrade(cx));
577                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
578                }
579            } else {
580                self.room = None;
581                Task::ready(Ok(()))
582            }
583        } else {
584            Task::ready(Ok(()))
585        }
586    }
587
588    pub fn room(&self) -> Option<&ModelHandle<Room>> {
589        self.room.as_ref().map(|(room, _)| room)
590    }
591
592    pub fn client(&self) -> Arc<Client> {
593        self.client.clone()
594    }
595
596    pub fn pending_invites(&self) -> &HashSet<u64> {
597        &self.pending_invites
598    }
599
600    fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
601        let (room_id, channel_id) = match self.room() {
602            Some(room) => {
603                let room = room.read(cx);
604                (Some(room.id()), room.channel_id())
605            }
606            None => (None, None),
607        };
608        Self::report_call_event_for_room(operation, room_id, channel_id, &self.client, cx)
609    }
610
611    pub fn report_call_event_for_room(
612        operation: &'static str,
613        room_id: Option<u64>,
614        channel_id: Option<u64>,
615        client: &Arc<Client>,
616        cx: &AppContext,
617    ) {
618        let telemetry = client.telemetry();
619        let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
620        let event = ClickhouseEvent::Call {
621            operation,
622            room_id,
623            channel_id,
624        };
625        telemetry.report_clickhouse_event(event, telemetry_settings);
626    }
627}