call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use call_settings::CallSettings;
  8use channel::ChannelId;
  9use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
 10use collections::HashSet;
 11use futures::{future::Shared, FutureExt};
 12use gpui::{
 13    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 14    WeakModelHandle,
 15};
 16use postage::watch;
 17use project::Project;
 18use std::sync::Arc;
 19
 20pub use participant::ParticipantLocation;
 21pub use room::Room;
 22
 23pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 24    settings::register::<CallSettings>(cx);
 25
 26    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 27    cx.set_global(active_call);
 28}
 29
 30#[derive(Clone)]
 31pub struct IncomingCall {
 32    pub room_id: u64,
 33    pub calling_user: Arc<User>,
 34    pub participants: Vec<Arc<User>>,
 35    pub initial_project: Option<proto::ParticipantProject>,
 36}
 37
 38/// Singleton global maintaining the user's participation in a room across workspaces.
 39pub struct ActiveCall {
 40    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 41    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 42    location: Option<WeakModelHandle<Project>>,
 43    pending_invites: HashSet<u64>,
 44    incoming_call: (
 45        watch::Sender<Option<IncomingCall>>,
 46        watch::Receiver<Option<IncomingCall>>,
 47    ),
 48    client: Arc<Client>,
 49    user_store: ModelHandle<UserStore>,
 50    _subscriptions: Vec<client::Subscription>,
 51}
 52
 53impl Entity for ActiveCall {
 54    type Event = room::Event;
 55}
 56
 57impl ActiveCall {
 58    fn new(
 59        client: Arc<Client>,
 60        user_store: ModelHandle<UserStore>,
 61        cx: &mut ModelContext<Self>,
 62    ) -> Self {
 63        Self {
 64            room: None,
 65            pending_room_creation: None,
 66            location: None,
 67            pending_invites: Default::default(),
 68            incoming_call: watch::channel(),
 69
 70            _subscriptions: vec![
 71                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 72                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 73            ],
 74            client,
 75            user_store,
 76        }
 77    }
 78
 79    pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
 80        self.room()?.read(cx).channel_id()
 81    }
 82
 83    async fn handle_incoming_call(
 84        this: ModelHandle<Self>,
 85        envelope: TypedEnvelope<proto::IncomingCall>,
 86        _: Arc<Client>,
 87        mut cx: AsyncAppContext,
 88    ) -> Result<proto::Ack> {
 89        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 90        let call = IncomingCall {
 91            room_id: envelope.payload.room_id,
 92            participants: user_store
 93                .update(&mut cx, |user_store, cx| {
 94                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 95                })
 96                .await?,
 97            calling_user: user_store
 98                .update(&mut cx, |user_store, cx| {
 99                    user_store.get_user(envelope.payload.calling_user_id, cx)
100                })
101                .await?,
102            initial_project: envelope.payload.initial_project,
103        };
104        this.update(&mut cx, |this, _| {
105            *this.incoming_call.0.borrow_mut() = Some(call);
106        });
107
108        Ok(proto::Ack {})
109    }
110
111    async fn handle_call_canceled(
112        this: ModelHandle<Self>,
113        envelope: TypedEnvelope<proto::CallCanceled>,
114        _: Arc<Client>,
115        mut cx: AsyncAppContext,
116    ) -> Result<()> {
117        this.update(&mut cx, |this, _| {
118            let mut incoming_call = this.incoming_call.0.borrow_mut();
119            if incoming_call
120                .as_ref()
121                .map_or(false, |call| call.room_id == envelope.payload.room_id)
122            {
123                incoming_call.take();
124            }
125        });
126        Ok(())
127    }
128
129    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
130        cx.global::<ModelHandle<Self>>().clone()
131    }
132
133    pub fn invite(
134        &mut self,
135        called_user_id: u64,
136        initial_project: Option<ModelHandle<Project>>,
137        cx: &mut ModelContext<Self>,
138    ) -> Task<Result<()>> {
139        if !self.pending_invites.insert(called_user_id) {
140            return Task::ready(Err(anyhow!("user was already invited")));
141        }
142        cx.notify();
143
144        let room = if let Some(room) = self.room().cloned() {
145            Some(Task::ready(Ok(room)).shared())
146        } else {
147            self.pending_room_creation.clone()
148        };
149
150        let invite = if let Some(room) = room {
151            cx.spawn_weak(|_, mut cx| async move {
152                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
153
154                let initial_project_id = if let Some(initial_project) = initial_project {
155                    Some(
156                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
157                            .await?,
158                    )
159                } else {
160                    None
161                };
162
163                room.update(&mut cx, |room, cx| {
164                    room.call(called_user_id, initial_project_id, cx)
165                })
166                .await?;
167
168                anyhow::Ok(())
169            })
170        } else {
171            let client = self.client.clone();
172            let user_store = self.user_store.clone();
173            let room = cx
174                .spawn(|this, mut cx| async move {
175                    let create_room = async {
176                        let room = cx
177                            .update(|cx| {
178                                Room::create(
179                                    called_user_id,
180                                    initial_project,
181                                    client,
182                                    user_store,
183                                    cx,
184                                )
185                            })
186                            .await?;
187
188                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
189                            .await?;
190
191                        anyhow::Ok(room)
192                    };
193
194                    let room = create_room.await;
195                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
196                    room.map_err(Arc::new)
197                })
198                .shared();
199            self.pending_room_creation = Some(room.clone());
200            cx.foreground().spawn(async move {
201                room.await.map_err(|err| anyhow!("{:?}", err))?;
202                anyhow::Ok(())
203            })
204        };
205
206        cx.spawn(|this, mut cx| async move {
207            let result = invite.await;
208            if result.is_ok() {
209                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
210            } else {
211                // TODO: Resport collaboration error
212            }
213
214            this.update(&mut cx, |this, cx| {
215                this.pending_invites.remove(&called_user_id);
216                cx.notify();
217            });
218            result
219        })
220    }
221
222    pub fn cancel_invite(
223        &mut self,
224        called_user_id: u64,
225        cx: &mut ModelContext<Self>,
226    ) -> Task<Result<()>> {
227        let room_id = if let Some(room) = self.room() {
228            room.read(cx).id()
229        } else {
230            return Task::ready(Err(anyhow!("no active call")));
231        };
232
233        let client = self.client.clone();
234        cx.foreground().spawn(async move {
235            client
236                .request(proto::CancelCall {
237                    room_id,
238                    called_user_id,
239                })
240                .await?;
241            anyhow::Ok(())
242        })
243    }
244
245    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
246        self.incoming_call.1.clone()
247    }
248
249    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
250        if self.room.is_some() {
251            return Task::ready(Err(anyhow!("cannot join while on another call")));
252        }
253
254        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
255            call
256        } else {
257            return Task::ready(Err(anyhow!("no incoming call")));
258        };
259
260        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
261
262        cx.spawn(|this, mut cx| async move {
263            let room = join.await?;
264            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
265                .await?;
266            this.update(&mut cx, |this, cx| {
267                this.report_call_event("accept incoming", cx)
268            });
269            Ok(())
270        })
271    }
272
273    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
274        let call = self
275            .incoming_call
276            .0
277            .borrow_mut()
278            .take()
279            .ok_or_else(|| anyhow!("no incoming call"))?;
280        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
281        self.client.send(proto::DeclineCall {
282            room_id: call.room_id,
283        })?;
284        Ok(())
285    }
286
287    pub fn join_channel(
288        &mut self,
289        channel_id: u64,
290        cx: &mut ModelContext<Self>,
291    ) -> Task<Result<()>> {
292        if let Some(room) = self.room().cloned() {
293            if room.read(cx).channel_id() == Some(channel_id) {
294                return Task::ready(Ok(()));
295            } else {
296                room.update(cx, |room, cx| room.clear_state(cx));
297            }
298        }
299
300        let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
301
302        cx.spawn(|this, mut cx| async move {
303            let room = join.await?;
304            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
305                .await?;
306            this.update(&mut cx, |this, cx| {
307                this.report_call_event("join channel", cx)
308            });
309            Ok(())
310        })
311    }
312
313    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
314        cx.notify();
315        self.report_call_event("hang up", cx);
316        Audio::end_call(cx);
317        if let Some((room, _)) = self.room.take() {
318            room.update(cx, |room, cx| room.leave(cx))
319        } else {
320            Task::ready(Ok(()))
321        }
322    }
323
324    pub fn share_project(
325        &mut self,
326        project: ModelHandle<Project>,
327        cx: &mut ModelContext<Self>,
328    ) -> Task<Result<u64>> {
329        if let Some((room, _)) = self.room.as_ref() {
330            self.report_call_event("share project", cx);
331            room.update(cx, |room, cx| room.share_project(project, cx))
332        } else {
333            Task::ready(Err(anyhow!("no active call")))
334        }
335    }
336
337    pub fn unshare_project(
338        &mut self,
339        project: ModelHandle<Project>,
340        cx: &mut ModelContext<Self>,
341    ) -> Result<()> {
342        if let Some((room, _)) = self.room.as_ref() {
343            self.report_call_event("unshare project", cx);
344            room.update(cx, |room, cx| room.unshare_project(project, cx))
345        } else {
346            Err(anyhow!("no active call"))
347        }
348    }
349
350    pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
351        self.location.as_ref()
352    }
353
354    pub fn set_location(
355        &mut self,
356        project: Option<&ModelHandle<Project>>,
357        cx: &mut ModelContext<Self>,
358    ) -> Task<Result<()>> {
359        self.location = project.map(|project| project.downgrade());
360        if let Some((room, _)) = self.room.as_ref() {
361            room.update(cx, |room, cx| room.set_location(project, cx))
362        } else {
363            Task::ready(Ok(()))
364        }
365    }
366
367    fn set_room(
368        &mut self,
369        room: Option<ModelHandle<Room>>,
370        cx: &mut ModelContext<Self>,
371    ) -> Task<Result<()>> {
372        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
373            cx.notify();
374            if let Some(room) = room {
375                if room.read(cx).status().is_offline() {
376                    self.room = None;
377                    Task::ready(Ok(()))
378                } else {
379                    let subscriptions = vec![
380                        cx.observe(&room, |this, room, cx| {
381                            if room.read(cx).status().is_offline() {
382                                this.set_room(None, cx).detach_and_log_err(cx);
383                            }
384
385                            cx.notify();
386                        }),
387                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
388                    ];
389                    self.room = Some((room.clone(), subscriptions));
390                    let location = self.location.and_then(|location| location.upgrade(cx));
391                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
392                }
393            } else {
394                self.room = None;
395                Task::ready(Ok(()))
396            }
397        } else {
398            Task::ready(Ok(()))
399        }
400    }
401
402    pub fn room(&self) -> Option<&ModelHandle<Room>> {
403        self.room.as_ref().map(|(room, _)| room)
404    }
405
406    pub fn client(&self) -> Arc<Client> {
407        self.client.clone()
408    }
409
410    pub fn pending_invites(&self) -> &HashSet<u64> {
411        &self.pending_invites
412    }
413
414    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
415        if let Some(room) = self.room() {
416            let room = room.read(cx);
417            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
418        }
419    }
420}
421
422pub fn report_call_event_for_room(
423    operation: &'static str,
424    room_id: u64,
425    channel_id: Option<u64>,
426    client: &Arc<Client>,
427    cx: &AppContext,
428) {
429    let telemetry = client.telemetry();
430    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
431    let event = ClickhouseEvent::Call {
432        operation,
433        room_id: Some(room_id),
434        channel_id,
435    };
436    telemetry.report_clickhouse_event(event, telemetry_settings);
437}
438
439pub fn report_call_event_for_channel(
440    operation: &'static str,
441    channel_id: u64,
442    client: &Arc<Client>,
443    cx: &AppContext,
444) {
445    let room = ActiveCall::global(cx).read(cx).room();
446
447    let telemetry = client.telemetry();
448    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
449
450    let event = ClickhouseEvent::Call {
451        operation,
452        room_id: room.map(|r| r.read(cx).id()),
453        channel_id: Some(channel_id),
454    };
455    telemetry.report_clickhouse_event(event, telemetry_settings);
456}