call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use std::sync::Arc;
  6
  7use anyhow::{anyhow, Result};
  8use audio::Audio;
  9use call_settings::CallSettings;
 10use channel::ChannelId;
 11use client::{
 12    proto::{self, PeerId},
 13    ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
 14};
 15use collections::HashSet;
 16use futures::{future::Shared, FutureExt};
 17use postage::watch;
 18
 19use gpui::{
 20    AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext,
 21    ModelHandle, Subscription, Task, ViewContext, WeakModelHandle,
 22};
 23use project::Project;
 24
 25pub use participant::ParticipantLocation;
 26pub use room::Room;
 27use util::ResultExt;
 28
 29pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 30    settings::register::<CallSettings>(cx);
 31
 32    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 33    cx.set_global(active_call);
 34}
 35
 36#[derive(Clone)]
 37pub struct IncomingCall {
 38    pub room_id: u64,
 39    pub calling_user: Arc<User>,
 40    pub participants: Vec<Arc<User>>,
 41    pub initial_project: Option<proto::ParticipantProject>,
 42}
 43
 44/// Singleton global maintaining the user's participation in a room across workspaces.
 45pub struct ActiveCall {
 46    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 47    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 48    location: Option<WeakModelHandle<Project>>,
 49    pending_invites: HashSet<u64>,
 50    incoming_call: (
 51        watch::Sender<Option<IncomingCall>>,
 52        watch::Receiver<Option<IncomingCall>>,
 53    ),
 54    client: Arc<Client>,
 55    user_store: ModelHandle<UserStore>,
 56    follow_handlers: Vec<FollowHandler>,
 57    followers: Vec<Follower>,
 58    _subscriptions: Vec<client::Subscription>,
 59}
 60
 61#[derive(PartialEq, Eq, PartialOrd, Ord, Debug)]
 62struct Follower {
 63    project_id: Option<u64>,
 64    peer_id: PeerId,
 65}
 66
 67struct FollowHandler {
 68    project_id: Option<u64>,
 69    root_view: AnyWeakViewHandle,
 70    get_views:
 71        Box<dyn Fn(&AnyViewHandle, Option<u64>, &mut AppContext) -> Option<proto::FollowResponse>>,
 72    update_view: Box<dyn Fn(&AnyViewHandle, PeerId, proto::UpdateFollowers, &mut AppContext)>,
 73}
 74
 75impl Entity for ActiveCall {
 76    type Event = room::Event;
 77}
 78
 79impl ActiveCall {
 80    fn new(
 81        client: Arc<Client>,
 82        user_store: ModelHandle<UserStore>,
 83        cx: &mut ModelContext<Self>,
 84    ) -> Self {
 85        Self {
 86            room: None,
 87            pending_room_creation: None,
 88            location: None,
 89            pending_invites: Default::default(),
 90            incoming_call: watch::channel(),
 91            follow_handlers: Default::default(),
 92            followers: Default::default(),
 93            _subscriptions: vec![
 94                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 95                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 96                client.add_request_handler(cx.handle(), Self::handle_follow),
 97                client.add_message_handler(cx.handle(), Self::handle_unfollow),
 98                client.add_message_handler(cx.handle(), Self::handle_update_from_leader),
 99            ],
100            client,
101            user_store,
102        }
103    }
104
105    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
106        self.room()?.read(cx).channel_id()
107    }
108
109    pub fn add_follow_handler<V: gpui::View, GetViews, UpdateView>(
110        &mut self,
111        root_view: gpui::ViewHandle<V>,
112        project_id: Option<u64>,
113        get_views: GetViews,
114        update_view: UpdateView,
115        _cx: &mut ModelContext<Self>,
116    ) where
117        GetViews: 'static
118            + Fn(&mut V, Option<u64>, &mut gpui::ViewContext<V>) -> Result<proto::FollowResponse>,
119        UpdateView:
120            'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext<V>) -> Result<()>,
121    {
122        self.follow_handlers
123            .retain(|h| h.root_view.id() != root_view.id());
124        if let Err(ix) = self
125            .follow_handlers
126            .binary_search_by_key(&(project_id, root_view.id()), |f| {
127                (f.project_id, f.root_view.id())
128            })
129        {
130            self.follow_handlers.insert(
131                ix,
132                FollowHandler {
133                    project_id,
134                    root_view: root_view.into_any().downgrade(),
135                    get_views: Box::new(move |view, project_id, cx| {
136                        let view = view.clone().downcast::<V>().unwrap();
137                        view.update(cx, |view, cx| get_views(view, project_id, cx).log_err())
138                            .flatten()
139                    }),
140                    update_view: Box::new(move |view, leader_id, message, cx| {
141                        let view = view.clone().downcast::<V>().unwrap();
142                        view.update(cx, |view, cx| {
143                            update_view(view, leader_id, message, cx).log_err()
144                        });
145                    }),
146                },
147            );
148        }
149    }
150
151    async fn handle_incoming_call(
152        this: ModelHandle<Self>,
153        envelope: TypedEnvelope<proto::IncomingCall>,
154        _: Arc<Client>,
155        mut cx: AsyncAppContext,
156    ) -> Result<proto::Ack> {
157        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
158        let call = IncomingCall {
159            room_id: envelope.payload.room_id,
160            participants: user_store
161                .update(&mut cx, |user_store, cx| {
162                    user_store.get_users(envelope.payload.participant_user_ids, cx)
163                })
164                .await?,
165            calling_user: user_store
166                .update(&mut cx, |user_store, cx| {
167                    user_store.get_user(envelope.payload.calling_user_id, cx)
168                })
169                .await?,
170            initial_project: envelope.payload.initial_project,
171        };
172        this.update(&mut cx, |this, _| {
173            *this.incoming_call.0.borrow_mut() = Some(call);
174        });
175
176        Ok(proto::Ack {})
177    }
178
179    async fn handle_call_canceled(
180        this: ModelHandle<Self>,
181        envelope: TypedEnvelope<proto::CallCanceled>,
182        _: Arc<Client>,
183        mut cx: AsyncAppContext,
184    ) -> Result<()> {
185        this.update(&mut cx, |this, _| {
186            let mut incoming_call = this.incoming_call.0.borrow_mut();
187            if incoming_call
188                .as_ref()
189                .map_or(false, |call| call.room_id == envelope.payload.room_id)
190            {
191                incoming_call.take();
192            }
193        });
194        Ok(())
195    }
196
197    async fn handle_follow(
198        this: ModelHandle<Self>,
199        envelope: TypedEnvelope<proto::Follow>,
200        _: Arc<Client>,
201        mut cx: AsyncAppContext,
202    ) -> Result<proto::FollowResponse> {
203        this.update(&mut cx, |this, cx| {
204            let follower = Follower {
205                project_id: envelope.payload.project_id,
206                peer_id: envelope.original_sender_id()?,
207            };
208            let active_project_id = this
209                .location
210                .as_ref()
211                .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
212
213            let mut response = proto::FollowResponse::default();
214            for handler in &this.follow_handlers {
215                if follower.project_id != handler.project_id && follower.project_id.is_some() {
216                    continue;
217                }
218
219                let Some(root_view) = handler.root_view.upgrade(cx) else {
220                    continue;
221                };
222
223                let Some(handler_response) =
224                    (handler.get_views)(&root_view, follower.project_id, cx)
225                else {
226                    continue;
227                };
228
229                if response.views.is_empty() {
230                    response.views = handler_response.views;
231                } else {
232                    response.views.extend_from_slice(&handler_response.views);
233                }
234
235                if let Some(active_view_id) = handler_response.active_view_id.clone() {
236                    if response.active_view_id.is_none() || handler.project_id == active_project_id
237                    {
238                        response.active_view_id = Some(active_view_id);
239                    }
240                }
241            }
242
243            if let Err(ix) = this.followers.binary_search(&follower) {
244                this.followers.insert(ix, follower);
245            }
246
247            Ok(response)
248        })
249    }
250
251    async fn handle_unfollow(
252        this: ModelHandle<Self>,
253        envelope: TypedEnvelope<proto::Unfollow>,
254        _: Arc<Client>,
255        mut cx: AsyncAppContext,
256    ) -> Result<()> {
257        this.update(&mut cx, |this, _| {
258            let follower = Follower {
259                project_id: envelope.payload.project_id,
260                peer_id: envelope.original_sender_id()?,
261            };
262            if let Ok(ix) = this.followers.binary_search(&follower) {
263                this.followers.remove(ix);
264            }
265            Ok(())
266        })
267    }
268
269    async fn handle_update_from_leader(
270        this: ModelHandle<Self>,
271        envelope: TypedEnvelope<proto::UpdateFollowers>,
272        _: Arc<Client>,
273        mut cx: AsyncAppContext,
274    ) -> Result<()> {
275        let leader_id = envelope.original_sender_id()?;
276        let update = envelope.payload;
277        this.update(&mut cx, |this, cx| {
278            for handler in &this.follow_handlers {
279                if update.project_id != handler.project_id && update.project_id.is_some() {
280                    continue;
281                }
282                let Some(root_view) = handler.root_view.upgrade(cx) else {
283                    continue;
284                };
285                (handler.update_view)(&root_view, leader_id, update.clone(), cx);
286            }
287            Ok(())
288        })
289    }
290
291    pub fn update_followers(
292        &self,
293        project_id: Option<u64>,
294        update: proto::update_followers::Variant,
295        cx: &AppContext,
296    ) -> Option<()> {
297        let room_id = self.room()?.read(cx).id();
298        let follower_ids: Vec<_> = self
299            .followers
300            .iter()
301            .filter_map(|follower| {
302                (follower.project_id == project_id).then_some(follower.peer_id.into())
303            })
304            .collect();
305        if follower_ids.is_empty() {
306            return None;
307        }
308        self.client
309            .send(proto::UpdateFollowers {
310                room_id,
311                project_id,
312                follower_ids,
313                variant: Some(update),
314            })
315            .log_err()
316    }
317
318    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
319        cx.global::<ModelHandle<Self>>().clone()
320    }
321
322    pub fn invite(
323        &mut self,
324        called_user_id: u64,
325        initial_project: Option<ModelHandle<Project>>,
326        cx: &mut ModelContext<Self>,
327    ) -> Task<Result<()>> {
328        if !self.pending_invites.insert(called_user_id) {
329            return Task::ready(Err(anyhow!("user was already invited")));
330        }
331        cx.notify();
332
333        let room = if let Some(room) = self.room().cloned() {
334            Some(Task::ready(Ok(room)).shared())
335        } else {
336            self.pending_room_creation.clone()
337        };
338
339        let invite = if let Some(room) = room {
340            cx.spawn_weak(|_, mut cx| async move {
341                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
342
343                let initial_project_id = if let Some(initial_project) = initial_project {
344                    Some(
345                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
346                            .await?,
347                    )
348                } else {
349                    None
350                };
351
352                room.update(&mut cx, |room, cx| {
353                    room.call(called_user_id, initial_project_id, cx)
354                })
355                .await?;
356
357                anyhow::Ok(())
358            })
359        } else {
360            let client = self.client.clone();
361            let user_store = self.user_store.clone();
362            let room = cx
363                .spawn(|this, mut cx| async move {
364                    let create_room = async {
365                        let room = cx
366                            .update(|cx| {
367                                Room::create(
368                                    called_user_id,
369                                    initial_project,
370                                    client,
371                                    user_store,
372                                    cx,
373                                )
374                            })
375                            .await?;
376
377                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
378                            .await?;
379
380                        anyhow::Ok(room)
381                    };
382
383                    let room = create_room.await;
384                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
385                    room.map_err(Arc::new)
386                })
387                .shared();
388            self.pending_room_creation = Some(room.clone());
389            cx.foreground().spawn(async move {
390                room.await.map_err(|err| anyhow!("{:?}", err))?;
391                anyhow::Ok(())
392            })
393        };
394
395        cx.spawn(|this, mut cx| async move {
396            let result = invite.await;
397            if result.is_ok() {
398                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
399            } else {
400                // TODO: Resport collaboration error
401            }
402
403            this.update(&mut cx, |this, cx| {
404                this.pending_invites.remove(&called_user_id);
405                cx.notify();
406            });
407            result
408        })
409    }
410
411    pub fn cancel_invite(
412        &mut self,
413        called_user_id: u64,
414        cx: &mut ModelContext<Self>,
415    ) -> Task<Result<()>> {
416        let room_id = if let Some(room) = self.room() {
417            room.read(cx).id()
418        } else {
419            return Task::ready(Err(anyhow!("no active call")));
420        };
421
422        let client = self.client.clone();
423        cx.foreground().spawn(async move {
424            client
425                .request(proto::CancelCall {
426                    room_id,
427                    called_user_id,
428                })
429                .await?;
430            anyhow::Ok(())
431        })
432    }
433
434    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
435        self.incoming_call.1.clone()
436    }
437
438    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
439        if self.room.is_some() {
440            return Task::ready(Err(anyhow!("cannot join while on another call")));
441        }
442
443        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
444            call
445        } else {
446            return Task::ready(Err(anyhow!("no incoming call")));
447        };
448
449        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
450
451        cx.spawn(|this, mut cx| async move {
452            let room = join.await?;
453            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
454                .await?;
455            this.update(&mut cx, |this, cx| {
456                this.report_call_event("accept incoming", cx)
457            });
458            Ok(())
459        })
460    }
461
462    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
463        let call = self
464            .incoming_call
465            .0
466            .borrow_mut()
467            .take()
468            .ok_or_else(|| anyhow!("no incoming call"))?;
469        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
470        self.client.send(proto::DeclineCall {
471            room_id: call.room_id,
472        })?;
473        Ok(())
474    }
475
476    pub fn join_channel(
477        &mut self,
478        channel_id: u64,
479        cx: &mut ModelContext<Self>,
480    ) -> Task<Result<()>> {
481        if let Some(room) = self.room().cloned() {
482            if room.read(cx).channel_id() == Some(channel_id) {
483                return Task::ready(Ok(()));
484            } else {
485                room.update(cx, |room, cx| room.clear_state(cx));
486            }
487        }
488
489        let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
490
491        cx.spawn(|this, mut cx| async move {
492            let room = join.await?;
493            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
494                .await?;
495            this.update(&mut cx, |this, cx| {
496                this.report_call_event("join channel", cx)
497            });
498            Ok(())
499        })
500    }
501
502    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
503        cx.notify();
504        self.report_call_event("hang up", cx);
505        Audio::end_call(cx);
506        if let Some((room, _)) = self.room.take() {
507            room.update(cx, |room, cx| room.leave(cx))
508        } else {
509            Task::ready(Ok(()))
510        }
511    }
512
513    pub fn share_project(
514        &mut self,
515        project: ModelHandle<Project>,
516        cx: &mut ModelContext<Self>,
517    ) -> Task<Result<u64>> {
518        if let Some((room, _)) = self.room.as_ref() {
519            self.report_call_event("share project", cx);
520            room.update(cx, |room, cx| room.share_project(project, cx))
521        } else {
522            Task::ready(Err(anyhow!("no active call")))
523        }
524    }
525
526    pub fn unshare_project(
527        &mut self,
528        project: ModelHandle<Project>,
529        cx: &mut ModelContext<Self>,
530    ) -> Result<()> {
531        if let Some((room, _)) = self.room.as_ref() {
532            self.report_call_event("unshare project", cx);
533            room.update(cx, |room, cx| room.unshare_project(project, cx))
534        } else {
535            Err(anyhow!("no active call"))
536        }
537    }
538
539    pub fn set_location(
540        &mut self,
541        project: Option<&ModelHandle<Project>>,
542        cx: &mut ModelContext<Self>,
543    ) -> Task<Result<()>> {
544        self.location = project.map(|project| project.downgrade());
545        if let Some((room, _)) = self.room.as_ref() {
546            room.update(cx, |room, cx| room.set_location(project, cx))
547        } else {
548            Task::ready(Ok(()))
549        }
550    }
551
552    fn set_room(
553        &mut self,
554        room: Option<ModelHandle<Room>>,
555        cx: &mut ModelContext<Self>,
556    ) -> Task<Result<()>> {
557        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
558            cx.notify();
559            if let Some(room) = room {
560                if room.read(cx).status().is_offline() {
561                    self.room = None;
562                    Task::ready(Ok(()))
563                } else {
564                    let subscriptions = vec![
565                        cx.observe(&room, |this, room, cx| {
566                            if room.read(cx).status().is_offline() {
567                                this.set_room(None, cx).detach_and_log_err(cx);
568                            }
569
570                            cx.notify();
571                        }),
572                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
573                    ];
574                    self.room = Some((room.clone(), subscriptions));
575                    let location = self.location.and_then(|location| location.upgrade(cx));
576                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
577                }
578            } else {
579                self.room = None;
580                Task::ready(Ok(()))
581            }
582        } else {
583            Task::ready(Ok(()))
584        }
585    }
586
587    pub fn room(&self) -> Option<&ModelHandle<Room>> {
588        self.room.as_ref().map(|(room, _)| room)
589    }
590
591    pub fn client(&self) -> Arc<Client> {
592        self.client.clone()
593    }
594
595    pub fn pending_invites(&self) -> &HashSet<u64> {
596        &self.pending_invites
597    }
598
599    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
600        if let Some(room) = self.room() {
601            let room = room.read(cx);
602            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
603        }
604    }
605}
606
607pub fn report_call_event_for_room(
608    operation: &'static str,
609    room_id: u64,
610    channel_id: Option<u64>,
611    client: &Arc<Client>,
612    cx: &AppContext,
613) {
614    let telemetry = client.telemetry();
615    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
616    let event = ClickhouseEvent::Call {
617        operation,
618        room_id: Some(room_id),
619        channel_id,
620    };
621    telemetry.report_clickhouse_event(event, telemetry_settings);
622}
623
624pub fn report_call_event_for_channel(
625    operation: &'static str,
626    channel_id: u64,
627    client: &Arc<Client>,
628    cx: &AppContext,
629) {
630    let room = ActiveCall::global(cx).read(cx).room();
631
632    let telemetry = client.telemetry();
633    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
634
635    let event = ClickhouseEvent::Call {
636        operation,
637        room_id: room.map(|r| r.read(cx).id()),
638        channel_id: Some(channel_id),
639    };
640    telemetry.report_clickhouse_event(event, telemetry_settings);
641}