call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use std::sync::Arc;
  6
  7use anyhow::{anyhow, Result};
  8use call_settings::CallSettings;
  9use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
 10use collections::HashSet;
 11use futures::{future::Shared, FutureExt};
 12use postage::watch;
 13
 14use gpui::{
 15    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 16    WeakModelHandle,
 17};
 18use project::Project;
 19
 20pub use participant::ParticipantLocation;
 21pub use room::Room;
 22
 23pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 24    settings::register::<CallSettings>(cx);
 25
 26    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 27    cx.set_global(active_call);
 28}
 29
 30#[derive(Clone)]
 31pub struct IncomingCall {
 32    pub room_id: u64,
 33    pub calling_user: Arc<User>,
 34    pub participants: Vec<Arc<User>>,
 35    pub initial_project: Option<proto::ParticipantProject>,
 36}
 37
 38/// Singleton global maintaining the user's participation in a room across workspaces.
 39pub struct ActiveCall {
 40    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 41    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 42    location: Option<WeakModelHandle<Project>>,
 43    pending_invites: HashSet<u64>,
 44    incoming_call: (
 45        watch::Sender<Option<IncomingCall>>,
 46        watch::Receiver<Option<IncomingCall>>,
 47    ),
 48    client: Arc<Client>,
 49    user_store: ModelHandle<UserStore>,
 50    _subscriptions: Vec<client::Subscription>,
 51}
 52
 53impl Entity for ActiveCall {
 54    type Event = room::Event;
 55}
 56
 57impl ActiveCall {
 58    fn new(
 59        client: Arc<Client>,
 60        user_store: ModelHandle<UserStore>,
 61        cx: &mut ModelContext<Self>,
 62    ) -> Self {
 63        Self {
 64            room: None,
 65            pending_room_creation: None,
 66            location: None,
 67            pending_invites: Default::default(),
 68            incoming_call: watch::channel(),
 69            _subscriptions: vec![
 70                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 71                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 72            ],
 73            client,
 74            user_store,
 75        }
 76    }
 77
 78    async fn handle_incoming_call(
 79        this: ModelHandle<Self>,
 80        envelope: TypedEnvelope<proto::IncomingCall>,
 81        _: Arc<Client>,
 82        mut cx: AsyncAppContext,
 83    ) -> Result<proto::Ack> {
 84        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 85        let call = IncomingCall {
 86            room_id: envelope.payload.room_id,
 87            participants: user_store
 88                .update(&mut cx, |user_store, cx| {
 89                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 90                })
 91                .await?,
 92            calling_user: user_store
 93                .update(&mut cx, |user_store, cx| {
 94                    user_store.get_user(envelope.payload.calling_user_id, cx)
 95                })
 96                .await?,
 97            initial_project: envelope.payload.initial_project,
 98        };
 99        this.update(&mut cx, |this, _| {
100            *this.incoming_call.0.borrow_mut() = Some(call);
101        });
102
103        Ok(proto::Ack {})
104    }
105
106    async fn handle_call_canceled(
107        this: ModelHandle<Self>,
108        envelope: TypedEnvelope<proto::CallCanceled>,
109        _: Arc<Client>,
110        mut cx: AsyncAppContext,
111    ) -> Result<()> {
112        this.update(&mut cx, |this, _| {
113            let mut incoming_call = this.incoming_call.0.borrow_mut();
114            if incoming_call
115                .as_ref()
116                .map_or(false, |call| call.room_id == envelope.payload.room_id)
117            {
118                incoming_call.take();
119            }
120        });
121        Ok(())
122    }
123
124    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
125        cx.global::<ModelHandle<Self>>().clone()
126    }
127
128    pub fn invite(
129        &mut self,
130        called_user_id: u64,
131        initial_project: Option<ModelHandle<Project>>,
132        cx: &mut ModelContext<Self>,
133    ) -> Task<Result<()>> {
134        if !self.pending_invites.insert(called_user_id) {
135            return Task::ready(Err(anyhow!("user was already invited")));
136        }
137        cx.notify();
138
139        let room = if let Some(room) = self.room().cloned() {
140            Some(Task::ready(Ok(room)).shared())
141        } else {
142            self.pending_room_creation.clone()
143        };
144
145        let invite = if let Some(room) = room {
146            cx.spawn_weak(|_, mut cx| async move {
147                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
148
149                let initial_project_id = if let Some(initial_project) = initial_project {
150                    Some(
151                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
152                            .await?,
153                    )
154                } else {
155                    None
156                };
157
158                room.update(&mut cx, |room, cx| {
159                    room.call(called_user_id, initial_project_id, cx)
160                })
161                .await?;
162
163                anyhow::Ok(())
164            })
165        } else {
166            let client = self.client.clone();
167            let user_store = self.user_store.clone();
168            let room = cx
169                .spawn(|this, mut cx| async move {
170                    let create_room = async {
171                        let room = cx
172                            .update(|cx| {
173                                Room::create(
174                                    called_user_id,
175                                    initial_project,
176                                    client,
177                                    user_store,
178                                    cx,
179                                )
180                            })
181                            .await?;
182
183                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
184                            .await?;
185
186                        anyhow::Ok(room)
187                    };
188
189                    let room = create_room.await;
190                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
191                    room.map_err(Arc::new)
192                })
193                .shared();
194            self.pending_room_creation = Some(room.clone());
195            cx.foreground().spawn(async move {
196                room.await.map_err(|err| anyhow!("{:?}", err))?;
197                anyhow::Ok(())
198            })
199        };
200
201        cx.spawn(|this, mut cx| async move {
202            let result = invite.await;
203            this.update(&mut cx, |this, cx| {
204                this.pending_invites.remove(&called_user_id);
205                this.report_call_event("invite", cx);
206                cx.notify();
207            });
208            result
209        })
210    }
211
212    pub fn cancel_invite(
213        &mut self,
214        called_user_id: u64,
215        cx: &mut ModelContext<Self>,
216    ) -> Task<Result<()>> {
217        let room_id = if let Some(room) = self.room() {
218            room.read(cx).id()
219        } else {
220            return Task::ready(Err(anyhow!("no active call")));
221        };
222
223        let client = self.client.clone();
224        cx.foreground().spawn(async move {
225            client
226                .request(proto::CancelCall {
227                    room_id,
228                    called_user_id,
229                })
230                .await?;
231            anyhow::Ok(())
232        })
233    }
234
235    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
236        self.incoming_call.1.clone()
237    }
238
239    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
240        if self.room.is_some() {
241            return Task::ready(Err(anyhow!("cannot join while on another call")));
242        }
243
244        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
245            call
246        } else {
247            return Task::ready(Err(anyhow!("no incoming call")));
248        };
249
250        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
251
252        cx.spawn(|this, mut cx| async move {
253            let room = join.await?;
254            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
255                .await?;
256            this.update(&mut cx, |this, cx| {
257                this.report_call_event("accept incoming", cx)
258            });
259            Ok(())
260        })
261    }
262
263    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
264        let call = self
265            .incoming_call
266            .0
267            .borrow_mut()
268            .take()
269            .ok_or_else(|| anyhow!("no incoming call"))?;
270        Self::report_call_event_for_room("decline incoming", call.room_id, &self.client, cx);
271        self.client.send(proto::DeclineCall {
272            room_id: call.room_id,
273        })?;
274        Ok(())
275    }
276
277    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
278        cx.notify();
279        self.report_call_event("hang up", cx);
280        if let Some((room, _)) = self.room.take() {
281            room.update(cx, |room, cx| room.leave(cx))
282        } else {
283            Task::ready(Ok(()))
284        }
285    }
286
287    pub fn share_project(
288        &mut self,
289        project: ModelHandle<Project>,
290        cx: &mut ModelContext<Self>,
291    ) -> Task<Result<u64>> {
292        if let Some((room, _)) = self.room.as_ref() {
293            self.report_call_event("share project", cx);
294            room.update(cx, |room, cx| room.share_project(project, cx))
295        } else {
296            Task::ready(Err(anyhow!("no active call")))
297        }
298    }
299
300    pub fn unshare_project(
301        &mut self,
302        project: ModelHandle<Project>,
303        cx: &mut ModelContext<Self>,
304    ) -> Result<()> {
305        if let Some((room, _)) = self.room.as_ref() {
306            self.report_call_event("unshare project", cx);
307            room.update(cx, |room, cx| room.unshare_project(project, cx))
308        } else {
309            Err(anyhow!("no active call"))
310        }
311    }
312
313    pub fn set_location(
314        &mut self,
315        project: Option<&ModelHandle<Project>>,
316        cx: &mut ModelContext<Self>,
317    ) -> Task<Result<()>> {
318        self.location = project.map(|project| project.downgrade());
319        if let Some((room, _)) = self.room.as_ref() {
320            room.update(cx, |room, cx| room.set_location(project, cx))
321        } else {
322            Task::ready(Ok(()))
323        }
324    }
325
326    fn set_room(
327        &mut self,
328        room: Option<ModelHandle<Room>>,
329        cx: &mut ModelContext<Self>,
330    ) -> Task<Result<()>> {
331        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
332            cx.notify();
333            if let Some(room) = room {
334                if room.read(cx).status().is_offline() {
335                    self.room = None;
336                    Task::ready(Ok(()))
337                } else {
338                    let subscriptions = vec![
339                        cx.observe(&room, |this, room, cx| {
340                            if room.read(cx).status().is_offline() {
341                                this.set_room(None, cx).detach_and_log_err(cx);
342                            }
343
344                            cx.notify();
345                        }),
346                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
347                    ];
348                    self.room = Some((room.clone(), subscriptions));
349                    let location = self.location.and_then(|location| location.upgrade(cx));
350                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
351                }
352            } else {
353                self.room = None;
354                Task::ready(Ok(()))
355            }
356        } else {
357            Task::ready(Ok(()))
358        }
359    }
360
361    pub fn room(&self) -> Option<&ModelHandle<Room>> {
362        self.room.as_ref().map(|(room, _)| room)
363    }
364
365    pub fn client(&self) -> Arc<Client> {
366        self.client.clone()
367    }
368
369    pub fn pending_invites(&self) -> &HashSet<u64> {
370        &self.pending_invites
371    }
372
373    fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
374        if let Some(room) = self.room() {
375            Self::report_call_event_for_room(operation, room.read(cx).id(), &self.client, cx)
376        }
377    }
378
379    pub fn report_call_event_for_room(
380        operation: &'static str,
381        room_id: u64,
382        client: &Arc<Client>,
383        cx: &AppContext,
384    ) {
385        let telemetry = client.telemetry();
386        let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
387        let event = ClickhouseEvent::Call { operation, room_id };
388        telemetry.report_clickhouse_event(event, telemetry_settings);
389    }
390}