call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use std::sync::Arc;
  6
  7use anyhow::{anyhow, Result};
  8use call_settings::CallSettings;
  9use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
 10use collections::HashSet;
 11use futures::{future::Shared, FutureExt};
 12use postage::watch;
 13
 14use gpui::{
 15    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 16    WeakModelHandle,
 17};
 18use project::Project;
 19
 20pub use participant::ParticipantLocation;
 21pub use room::Room;
 22
 23pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 24    settings::register::<CallSettings>(cx);
 25
 26    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 27    cx.set_global(active_call);
 28}
 29
 30#[derive(Clone)]
 31pub struct IncomingCall {
 32    pub room_id: u64,
 33    pub calling_user: Arc<User>,
 34    pub participants: Vec<Arc<User>>,
 35    pub initial_project: Option<proto::ParticipantProject>,
 36}
 37
 38/// Singleton global maintaining the user's participation in a room across workspaces.
 39pub struct ActiveCall {
 40    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 41    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 42    location: Option<WeakModelHandle<Project>>,
 43    pending_invites: HashSet<u64>,
 44    incoming_call: (
 45        watch::Sender<Option<IncomingCall>>,
 46        watch::Receiver<Option<IncomingCall>>,
 47    ),
 48    client: Arc<Client>,
 49    user_store: ModelHandle<UserStore>,
 50    _subscriptions: Vec<client::Subscription>,
 51}
 52
 53impl Entity for ActiveCall {
 54    type Event = room::Event;
 55}
 56
 57impl ActiveCall {
 58    fn new(
 59        client: Arc<Client>,
 60        user_store: ModelHandle<UserStore>,
 61        cx: &mut ModelContext<Self>,
 62    ) -> Self {
 63        Self {
 64            room: None,
 65            pending_room_creation: None,
 66            location: None,
 67            pending_invites: Default::default(),
 68            incoming_call: watch::channel(),
 69            _subscriptions: vec![
 70                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 71                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 72            ],
 73            client,
 74            user_store,
 75        }
 76    }
 77
 78    async fn handle_incoming_call(
 79        this: ModelHandle<Self>,
 80        envelope: TypedEnvelope<proto::IncomingCall>,
 81        _: Arc<Client>,
 82        mut cx: AsyncAppContext,
 83    ) -> Result<proto::Ack> {
 84        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 85        let call = IncomingCall {
 86            room_id: envelope.payload.room_id,
 87            participants: user_store
 88                .update(&mut cx, |user_store, cx| {
 89                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 90                })
 91                .await?,
 92            calling_user: user_store
 93                .update(&mut cx, |user_store, cx| {
 94                    user_store.get_user(envelope.payload.calling_user_id, cx)
 95                })
 96                .await?,
 97            initial_project: envelope.payload.initial_project,
 98        };
 99        this.update(&mut cx, |this, _| {
100            *this.incoming_call.0.borrow_mut() = Some(call);
101        });
102
103        Ok(proto::Ack {})
104    }
105
106    async fn handle_call_canceled(
107        this: ModelHandle<Self>,
108        envelope: TypedEnvelope<proto::CallCanceled>,
109        _: Arc<Client>,
110        mut cx: AsyncAppContext,
111    ) -> Result<()> {
112        this.update(&mut cx, |this, _| {
113            let mut incoming_call = this.incoming_call.0.borrow_mut();
114            if incoming_call
115                .as_ref()
116                .map_or(false, |call| call.room_id == envelope.payload.room_id)
117            {
118                incoming_call.take();
119            }
120        });
121        Ok(())
122    }
123
124    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
125        cx.global::<ModelHandle<Self>>().clone()
126    }
127
128    pub fn invite(
129        &mut self,
130        called_user_id: u64,
131        initial_project: Option<ModelHandle<Project>>,
132        cx: &mut ModelContext<Self>,
133    ) -> Task<Result<()>> {
134        if !self.pending_invites.insert(called_user_id) {
135            return Task::ready(Err(anyhow!("user was already invited")));
136        }
137        cx.notify();
138
139        let room = if let Some(room) = self.room().cloned() {
140            Some(Task::ready(Ok(room)).shared())
141        } else {
142            self.pending_room_creation.clone()
143        };
144
145        let invite = if let Some(room) = room {
146            cx.spawn_weak(|_, mut cx| async move {
147                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
148
149                let initial_project_id = if let Some(initial_project) = initial_project {
150                    Some(
151                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
152                            .await?,
153                    )
154                } else {
155                    None
156                };
157
158                room.update(&mut cx, |room, cx| {
159                    room.call(called_user_id, initial_project_id, cx)
160                })
161                .await?;
162
163                anyhow::Ok(())
164            })
165        } else {
166            let client = self.client.clone();
167            let user_store = self.user_store.clone();
168            let room = cx
169                .spawn(|this, mut cx| async move {
170                    let create_room = async {
171                        let room = cx
172                            .update(|cx| {
173                                Room::create(
174                                    called_user_id,
175                                    initial_project,
176                                    client,
177                                    user_store,
178                                    cx,
179                                )
180                            })
181                            .await?;
182
183                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
184                            .await?;
185
186                        anyhow::Ok(room)
187                    };
188
189                    let room = create_room.await;
190                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
191                    room.map_err(Arc::new)
192                })
193                .shared();
194            self.pending_room_creation = Some(room.clone());
195            cx.foreground().spawn(async move {
196                room.await.map_err(|err| anyhow!("{:?}", err))?;
197                anyhow::Ok(())
198            })
199        };
200
201        cx.spawn(|this, mut cx| async move {
202            let result = invite.await;
203            this.update(&mut cx, |this, cx| {
204                this.pending_invites.remove(&called_user_id);
205                this.report_call_event("invite", cx);
206                cx.notify();
207            });
208            result
209        })
210    }
211
212    pub fn cancel_invite(
213        &mut self,
214        called_user_id: u64,
215        cx: &mut ModelContext<Self>,
216    ) -> Task<Result<()>> {
217        let room_id = if let Some(room) = self.room() {
218            room.read(cx).id()
219        } else {
220            return Task::ready(Err(anyhow!("no active call")));
221        };
222
223        let client = self.client.clone();
224        cx.foreground().spawn(async move {
225            client
226                .request(proto::CancelCall {
227                    room_id,
228                    called_user_id,
229                })
230                .await?;
231            anyhow::Ok(())
232        })
233    }
234
235    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
236        self.incoming_call.1.clone()
237    }
238
239    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
240        if self.room.is_some() {
241            return Task::ready(Err(anyhow!("cannot join while on another call")));
242        }
243
244        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
245            call
246        } else {
247            return Task::ready(Err(anyhow!("no incoming call")));
248        };
249
250        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
251
252        cx.spawn(|this, mut cx| async move {
253            let room = join.await?;
254            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
255                .await?;
256            this.update(&mut cx, |this, cx| {
257                this.report_call_event("accept incoming", cx)
258            });
259            Ok(())
260        })
261    }
262
263    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
264        let call = self
265            .incoming_call
266            .0
267            .borrow_mut()
268            .take()
269            .ok_or_else(|| anyhow!("no incoming call"))?;
270        Self::report_call_event_for_room("decline incoming", call.room_id, &self.client, cx);
271        self.client.send(proto::DeclineCall {
272            room_id: call.room_id,
273        })?;
274        Ok(())
275    }
276
277    pub fn join_channel(
278        &mut self,
279        channel_id: u64,
280        cx: &mut ModelContext<Self>,
281    ) -> Task<Result<()>> {
282        let leave_room;
283        if let Some(room) = self.room().cloned() {
284            if room.read(cx).channel_id() == Some(channel_id) {
285                return Task::ready(Ok(()));
286            } else {
287                leave_room = room.update(cx, |room, cx| room.leave(cx));
288            }
289        } else {
290            leave_room = Task::ready(Ok(()));
291        }
292
293        let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
294
295        cx.spawn(|this, mut cx| async move {
296            leave_room.await?;
297            let room = join.await?;
298            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
299                .await?;
300            this.update(&mut cx, |this, cx| {
301                this.report_call_event("join channel", cx)
302            });
303            Ok(())
304        })
305    }
306
307    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
308        cx.notify();
309        self.report_call_event("hang up", cx);
310        if let Some((room, _)) = self.room.take() {
311            room.update(cx, |room, cx| room.leave(cx))
312        } else {
313            Task::ready(Ok(()))
314        }
315    }
316
317    pub fn share_project(
318        &mut self,
319        project: ModelHandle<Project>,
320        cx: &mut ModelContext<Self>,
321    ) -> Task<Result<u64>> {
322        if let Some((room, _)) = self.room.as_ref() {
323            self.report_call_event("share project", cx);
324            room.update(cx, |room, cx| room.share_project(project, cx))
325        } else {
326            Task::ready(Err(anyhow!("no active call")))
327        }
328    }
329
330    pub fn unshare_project(
331        &mut self,
332        project: ModelHandle<Project>,
333        cx: &mut ModelContext<Self>,
334    ) -> Result<()> {
335        if let Some((room, _)) = self.room.as_ref() {
336            self.report_call_event("unshare project", cx);
337            room.update(cx, |room, cx| room.unshare_project(project, cx))
338        } else {
339            Err(anyhow!("no active call"))
340        }
341    }
342
343    pub fn set_location(
344        &mut self,
345        project: Option<&ModelHandle<Project>>,
346        cx: &mut ModelContext<Self>,
347    ) -> Task<Result<()>> {
348        self.location = project.map(|project| project.downgrade());
349        if let Some((room, _)) = self.room.as_ref() {
350            room.update(cx, |room, cx| room.set_location(project, cx))
351        } else {
352            Task::ready(Ok(()))
353        }
354    }
355
356    fn set_room(
357        &mut self,
358        room: Option<ModelHandle<Room>>,
359        cx: &mut ModelContext<Self>,
360    ) -> Task<Result<()>> {
361        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
362            cx.notify();
363            if let Some(room) = room {
364                if room.read(cx).status().is_offline() {
365                    self.room = None;
366                    Task::ready(Ok(()))
367                } else {
368                    let subscriptions = vec![
369                        cx.observe(&room, |this, room, cx| {
370                            if room.read(cx).status().is_offline() {
371                                this.set_room(None, cx).detach_and_log_err(cx);
372                            }
373
374                            cx.notify();
375                        }),
376                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
377                    ];
378                    self.room = Some((room.clone(), subscriptions));
379                    let location = self.location.and_then(|location| location.upgrade(cx));
380                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
381                }
382            } else {
383                self.room = None;
384                Task::ready(Ok(()))
385            }
386        } else {
387            Task::ready(Ok(()))
388        }
389    }
390
391    pub fn room(&self) -> Option<&ModelHandle<Room>> {
392        self.room.as_ref().map(|(room, _)| room)
393    }
394
395    pub fn client(&self) -> Arc<Client> {
396        self.client.clone()
397    }
398
399    pub fn pending_invites(&self) -> &HashSet<u64> {
400        &self.pending_invites
401    }
402
403    fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
404        if let Some(room) = self.room() {
405            Self::report_call_event_for_room(operation, room.read(cx).id(), &self.client, cx)
406        }
407    }
408
409    pub fn report_call_event_for_room(
410        operation: &'static str,
411        room_id: u64,
412        client: &Arc<Client>,
413        cx: &AppContext,
414    ) {
415        let telemetry = client.telemetry();
416        let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
417        let event = ClickhouseEvent::Call { operation, room_id };
418        telemetry.report_clickhouse_event(event, telemetry_settings);
419    }
420}