call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use std::sync::Arc;
  6
  7use anyhow::{anyhow, Result};
  8use audio::Audio;
  9use call_settings::CallSettings;
 10use channel::ChannelId;
 11use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
 12use collections::HashSet;
 13use futures::{future::Shared, FutureExt};
 14use postage::watch;
 15
 16use gpui::{
 17    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 18    WeakModelHandle,
 19};
 20use project::Project;
 21
 22pub use participant::ParticipantLocation;
 23pub use room::Room;
 24
 25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 26    settings::register::<CallSettings>(cx);
 27
 28    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 29    cx.set_global(active_call);
 30}
 31
 32#[derive(Clone)]
 33pub struct IncomingCall {
 34    pub room_id: u64,
 35    pub calling_user: Arc<User>,
 36    pub participants: Vec<Arc<User>>,
 37    pub initial_project: Option<proto::ParticipantProject>,
 38}
 39
 40/// Singleton global maintaining the user's participation in a room across workspaces.
 41pub struct ActiveCall {
 42    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 43    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 44    location: Option<WeakModelHandle<Project>>,
 45    pending_invites: HashSet<u64>,
 46    incoming_call: (
 47        watch::Sender<Option<IncomingCall>>,
 48        watch::Receiver<Option<IncomingCall>>,
 49    ),
 50    client: Arc<Client>,
 51    user_store: ModelHandle<UserStore>,
 52    _subscriptions: Vec<client::Subscription>,
 53}
 54
 55impl Entity for ActiveCall {
 56    type Event = room::Event;
 57}
 58
 59impl ActiveCall {
 60    fn new(
 61        client: Arc<Client>,
 62        user_store: ModelHandle<UserStore>,
 63        cx: &mut ModelContext<Self>,
 64    ) -> Self {
 65        Self {
 66            room: None,
 67            pending_room_creation: None,
 68            location: None,
 69            pending_invites: Default::default(),
 70            incoming_call: watch::channel(),
 71            _subscriptions: vec![
 72                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 73                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 74            ],
 75            client,
 76            user_store,
 77        }
 78    }
 79
 80    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
 81        self.room()?.read(cx).channel_id()
 82    }
 83
 84    async fn handle_incoming_call(
 85        this: ModelHandle<Self>,
 86        envelope: TypedEnvelope<proto::IncomingCall>,
 87        _: Arc<Client>,
 88        mut cx: AsyncAppContext,
 89    ) -> Result<proto::Ack> {
 90        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 91        let call = IncomingCall {
 92            room_id: envelope.payload.room_id,
 93            participants: user_store
 94                .update(&mut cx, |user_store, cx| {
 95                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 96                })
 97                .await?,
 98            calling_user: user_store
 99                .update(&mut cx, |user_store, cx| {
100                    user_store.get_user(envelope.payload.calling_user_id, cx)
101                })
102                .await?,
103            initial_project: envelope.payload.initial_project,
104        };
105        this.update(&mut cx, |this, _| {
106            *this.incoming_call.0.borrow_mut() = Some(call);
107        });
108
109        Ok(proto::Ack {})
110    }
111
112    async fn handle_call_canceled(
113        this: ModelHandle<Self>,
114        envelope: TypedEnvelope<proto::CallCanceled>,
115        _: Arc<Client>,
116        mut cx: AsyncAppContext,
117    ) -> Result<()> {
118        this.update(&mut cx, |this, _| {
119            let mut incoming_call = this.incoming_call.0.borrow_mut();
120            if incoming_call
121                .as_ref()
122                .map_or(false, |call| call.room_id == envelope.payload.room_id)
123            {
124                incoming_call.take();
125            }
126        });
127        Ok(())
128    }
129
130    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
131        cx.global::<ModelHandle<Self>>().clone()
132    }
133
134    pub fn invite(
135        &mut self,
136        called_user_id: u64,
137        initial_project: Option<ModelHandle<Project>>,
138        cx: &mut ModelContext<Self>,
139    ) -> Task<Result<()>> {
140        if !self.pending_invites.insert(called_user_id) {
141            return Task::ready(Err(anyhow!("user was already invited")));
142        }
143        cx.notify();
144
145        let room = if let Some(room) = self.room().cloned() {
146            Some(Task::ready(Ok(room)).shared())
147        } else {
148            self.pending_room_creation.clone()
149        };
150
151        let invite = if let Some(room) = room {
152            cx.spawn_weak(|_, mut cx| async move {
153                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
154
155                let initial_project_id = if let Some(initial_project) = initial_project {
156                    Some(
157                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
158                            .await?,
159                    )
160                } else {
161                    None
162                };
163
164                room.update(&mut cx, |room, cx| {
165                    room.call(called_user_id, initial_project_id, cx)
166                })
167                .await?;
168
169                anyhow::Ok(())
170            })
171        } else {
172            let client = self.client.clone();
173            let user_store = self.user_store.clone();
174            let room = cx
175                .spawn(|this, mut cx| async move {
176                    let create_room = async {
177                        let room = cx
178                            .update(|cx| {
179                                Room::create(
180                                    called_user_id,
181                                    initial_project,
182                                    client,
183                                    user_store,
184                                    cx,
185                                )
186                            })
187                            .await?;
188
189                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
190                            .await?;
191
192                        anyhow::Ok(room)
193                    };
194
195                    let room = create_room.await;
196                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
197                    room.map_err(Arc::new)
198                })
199                .shared();
200            self.pending_room_creation = Some(room.clone());
201            cx.foreground().spawn(async move {
202                room.await.map_err(|err| anyhow!("{:?}", err))?;
203                anyhow::Ok(())
204            })
205        };
206
207        cx.spawn(|this, mut cx| async move {
208            let result = invite.await;
209            if result.is_ok() {
210                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
211            } else {
212                // TODO: Resport collaboration error
213            }
214
215            this.update(&mut cx, |this, cx| {
216                this.pending_invites.remove(&called_user_id);
217                cx.notify();
218            });
219            result
220        })
221    }
222
223    pub fn cancel_invite(
224        &mut self,
225        called_user_id: u64,
226        cx: &mut ModelContext<Self>,
227    ) -> Task<Result<()>> {
228        let room_id = if let Some(room) = self.room() {
229            room.read(cx).id()
230        } else {
231            return Task::ready(Err(anyhow!("no active call")));
232        };
233
234        let client = self.client.clone();
235        cx.foreground().spawn(async move {
236            client
237                .request(proto::CancelCall {
238                    room_id,
239                    called_user_id,
240                })
241                .await?;
242            anyhow::Ok(())
243        })
244    }
245
246    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
247        self.incoming_call.1.clone()
248    }
249
250    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
251        if self.room.is_some() {
252            return Task::ready(Err(anyhow!("cannot join while on another call")));
253        }
254
255        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
256            call
257        } else {
258            return Task::ready(Err(anyhow!("no incoming call")));
259        };
260
261        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
262
263        cx.spawn(|this, mut cx| async move {
264            let room = join.await?;
265            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
266                .await?;
267            this.update(&mut cx, |this, cx| {
268                this.report_call_event("accept incoming", cx)
269            });
270            Ok(())
271        })
272    }
273
274    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
275        let call = self
276            .incoming_call
277            .0
278            .borrow_mut()
279            .take()
280            .ok_or_else(|| anyhow!("no incoming call"))?;
281        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
282        self.client.send(proto::DeclineCall {
283            room_id: call.room_id,
284        })?;
285        Ok(())
286    }
287
288    pub fn join_channel(
289        &mut self,
290        channel_id: u64,
291        cx: &mut ModelContext<Self>,
292    ) -> Task<Result<()>> {
293        if let Some(room) = self.room().cloned() {
294            if room.read(cx).channel_id() == Some(channel_id) {
295                return Task::ready(Ok(()));
296            } else {
297                room.update(cx, |room, cx| room.clear_state(cx));
298            }
299        }
300
301        let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
302
303        cx.spawn(|this, mut cx| async move {
304            let room = join.await?;
305            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
306                .await?;
307            this.update(&mut cx, |this, cx| {
308                this.report_call_event("join channel", cx)
309            });
310            Ok(())
311        })
312    }
313
314    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
315        cx.notify();
316        self.report_call_event("hang up", cx);
317        Audio::end_call(cx);
318        if let Some((room, _)) = self.room.take() {
319            room.update(cx, |room, cx| room.leave(cx))
320        } else {
321            Task::ready(Ok(()))
322        }
323    }
324
325    pub fn share_project(
326        &mut self,
327        project: ModelHandle<Project>,
328        cx: &mut ModelContext<Self>,
329    ) -> Task<Result<u64>> {
330        if let Some((room, _)) = self.room.as_ref() {
331            self.report_call_event("share project", cx);
332            room.update(cx, |room, cx| room.share_project(project, cx))
333        } else {
334            Task::ready(Err(anyhow!("no active call")))
335        }
336    }
337
338    pub fn unshare_project(
339        &mut self,
340        project: ModelHandle<Project>,
341        cx: &mut ModelContext<Self>,
342    ) -> Result<()> {
343        if let Some((room, _)) = self.room.as_ref() {
344            self.report_call_event("unshare project", cx);
345            room.update(cx, |room, cx| room.unshare_project(project, cx))
346        } else {
347            Err(anyhow!("no active call"))
348        }
349    }
350
351    pub fn set_location(
352        &mut self,
353        project: Option<&ModelHandle<Project>>,
354        cx: &mut ModelContext<Self>,
355    ) -> Task<Result<()>> {
356        self.location = project.map(|project| project.downgrade());
357        if let Some((room, _)) = self.room.as_ref() {
358            room.update(cx, |room, cx| room.set_location(project, cx))
359        } else {
360            Task::ready(Ok(()))
361        }
362    }
363
364    fn set_room(
365        &mut self,
366        room: Option<ModelHandle<Room>>,
367        cx: &mut ModelContext<Self>,
368    ) -> Task<Result<()>> {
369        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
370            cx.notify();
371            if let Some(room) = room {
372                if room.read(cx).status().is_offline() {
373                    self.room = None;
374                    Task::ready(Ok(()))
375                } else {
376                    let subscriptions = vec![
377                        cx.observe(&room, |this, room, cx| {
378                            if room.read(cx).status().is_offline() {
379                                this.set_room(None, cx).detach_and_log_err(cx);
380                            }
381
382                            cx.notify();
383                        }),
384                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
385                    ];
386                    self.room = Some((room.clone(), subscriptions));
387                    let location = self.location.and_then(|location| location.upgrade(cx));
388                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
389                }
390            } else {
391                self.room = None;
392                Task::ready(Ok(()))
393            }
394        } else {
395            Task::ready(Ok(()))
396        }
397    }
398
399    pub fn room(&self) -> Option<&ModelHandle<Room>> {
400        self.room.as_ref().map(|(room, _)| room)
401    }
402
403    pub fn client(&self) -> Arc<Client> {
404        self.client.clone()
405    }
406
407    pub fn pending_invites(&self) -> &HashSet<u64> {
408        &self.pending_invites
409    }
410
411    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
412        if let Some(room) = self.room() {
413            let room = room.read(cx);
414            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
415        }
416    }
417}
418
419pub fn report_call_event_for_room(
420    operation: &'static str,
421    room_id: u64,
422    channel_id: Option<u64>,
423    client: &Arc<Client>,
424    cx: &AppContext,
425) {
426    let telemetry = client.telemetry();
427    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
428    let event = ClickhouseEvent::Call {
429        operation,
430        room_id: Some(room_id),
431        channel_id,
432    };
433    telemetry.report_clickhouse_event(event, telemetry_settings);
434}
435
436pub fn report_call_event_for_channel(
437    operation: &'static str,
438    channel_id: u64,
439    client: &Arc<Client>,
440    cx: &AppContext,
441) {
442    let room = ActiveCall::global(cx).read(cx).room();
443
444    let telemetry = client.telemetry();
445    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
446
447    let event = ClickhouseEvent::Call {
448        operation,
449        room_id: room.map(|r| r.read(cx).id()),
450        channel_id: Some(channel_id),
451    };
452    telemetry.report_clickhouse_event(event, telemetry_settings);
453}