call.rs

  1pub mod participant;
  2pub mod room;
  3
  4use std::sync::Arc;
  5
  6use anyhow::{anyhow, Result};
  7use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
  8use collections::HashSet;
  9use futures::{future::Shared, FutureExt};
 10use postage::watch;
 11
 12use gpui::{
 13    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 14    WeakModelHandle,
 15};
 16use project::Project;
 17
 18pub use participant::ParticipantLocation;
 19pub use room::Room;
 20
 21pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 22    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 23    cx.set_global(active_call);
 24}
 25
 26#[derive(Clone)]
 27pub struct IncomingCall {
 28    pub room_id: u64,
 29    pub calling_user: Arc<User>,
 30    pub participants: Vec<Arc<User>>,
 31    pub initial_project: Option<proto::ParticipantProject>,
 32}
 33
 34/// Singleton global maintaining the user's participation in a room across workspaces.
 35pub struct ActiveCall {
 36    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 37    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 38    location: Option<WeakModelHandle<Project>>,
 39    pending_invites: HashSet<u64>,
 40    incoming_call: (
 41        watch::Sender<Option<IncomingCall>>,
 42        watch::Receiver<Option<IncomingCall>>,
 43    ),
 44    client: Arc<Client>,
 45    user_store: ModelHandle<UserStore>,
 46    _subscriptions: Vec<client::Subscription>,
 47}
 48
 49impl Entity for ActiveCall {
 50    type Event = room::Event;
 51}
 52
 53impl ActiveCall {
 54    fn new(
 55        client: Arc<Client>,
 56        user_store: ModelHandle<UserStore>,
 57        cx: &mut ModelContext<Self>,
 58    ) -> Self {
 59        Self {
 60            room: None,
 61            pending_room_creation: None,
 62            location: None,
 63            pending_invites: Default::default(),
 64            incoming_call: watch::channel(),
 65            _subscriptions: vec![
 66                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 67                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 68            ],
 69            client,
 70            user_store,
 71        }
 72    }
 73
 74    async fn handle_incoming_call(
 75        this: ModelHandle<Self>,
 76        envelope: TypedEnvelope<proto::IncomingCall>,
 77        _: Arc<Client>,
 78        mut cx: AsyncAppContext,
 79    ) -> Result<proto::Ack> {
 80        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 81        let call = IncomingCall {
 82            room_id: envelope.payload.room_id,
 83            participants: user_store
 84                .update(&mut cx, |user_store, cx| {
 85                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 86                })
 87                .await?,
 88            calling_user: user_store
 89                .update(&mut cx, |user_store, cx| {
 90                    user_store.get_user(envelope.payload.calling_user_id, cx)
 91                })
 92                .await?,
 93            initial_project: envelope.payload.initial_project,
 94        };
 95        this.update(&mut cx, |this, _| {
 96            *this.incoming_call.0.borrow_mut() = Some(call);
 97        });
 98
 99        Ok(proto::Ack {})
100    }
101
102    async fn handle_call_canceled(
103        this: ModelHandle<Self>,
104        envelope: TypedEnvelope<proto::CallCanceled>,
105        _: Arc<Client>,
106        mut cx: AsyncAppContext,
107    ) -> Result<()> {
108        this.update(&mut cx, |this, _| {
109            let mut incoming_call = this.incoming_call.0.borrow_mut();
110            if incoming_call
111                .as_ref()
112                .map_or(false, |call| call.room_id == envelope.payload.room_id)
113            {
114                incoming_call.take();
115            }
116        });
117        Ok(())
118    }
119
120    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
121        cx.global::<ModelHandle<Self>>().clone()
122    }
123
124    pub fn invite(
125        &mut self,
126        called_user_id: u64,
127        initial_project: Option<ModelHandle<Project>>,
128        cx: &mut ModelContext<Self>,
129    ) -> Task<Result<()>> {
130        if !self.pending_invites.insert(called_user_id) {
131            return Task::ready(Err(anyhow!("user was already invited")));
132        }
133        cx.notify();
134
135        let room = if let Some(room) = self.room().cloned() {
136            Some(Task::ready(Ok(room)).shared())
137        } else {
138            self.pending_room_creation.clone()
139        };
140
141        let invite = if let Some(room) = room {
142            cx.spawn_weak(|_, mut cx| async move {
143                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
144
145                let initial_project_id = if let Some(initial_project) = initial_project {
146                    Some(
147                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
148                            .await?,
149                    )
150                } else {
151                    None
152                };
153
154                room.update(&mut cx, |room, cx| {
155                    room.call(called_user_id, initial_project_id, cx)
156                })
157                .await?;
158
159                anyhow::Ok(())
160            })
161        } else {
162            let client = self.client.clone();
163            let user_store = self.user_store.clone();
164            let room = cx
165                .spawn(|this, mut cx| async move {
166                    let create_room = async {
167                        let room = cx
168                            .update(|cx| {
169                                Room::create(
170                                    called_user_id,
171                                    initial_project,
172                                    client,
173                                    user_store,
174                                    cx,
175                                )
176                            })
177                            .await?;
178
179                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
180                            .await?;
181
182                        anyhow::Ok(room)
183                    };
184
185                    let room = create_room.await;
186                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
187                    room.map_err(Arc::new)
188                })
189                .shared();
190            self.pending_room_creation = Some(room.clone());
191            cx.foreground().spawn(async move {
192                room.await.map_err(|err| anyhow!("{:?}", err))?;
193                anyhow::Ok(())
194            })
195        };
196
197        cx.spawn(|this, mut cx| async move {
198            let result = invite.await;
199            this.update(&mut cx, |this, cx| {
200                this.pending_invites.remove(&called_user_id);
201                this.report_call_event("invite", cx);
202                cx.notify();
203            });
204            result
205        })
206    }
207
208    pub fn cancel_invite(
209        &mut self,
210        called_user_id: u64,
211        cx: &mut ModelContext<Self>,
212    ) -> Task<Result<()>> {
213        let room_id = if let Some(room) = self.room() {
214            room.read(cx).id()
215        } else {
216            return Task::ready(Err(anyhow!("no active call")));
217        };
218
219        let client = self.client.clone();
220        cx.foreground().spawn(async move {
221            client
222                .request(proto::CancelCall {
223                    room_id,
224                    called_user_id,
225                })
226                .await?;
227            anyhow::Ok(())
228        })
229    }
230
231    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
232        self.incoming_call.1.clone()
233    }
234
235    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
236        if self.room.is_some() {
237            return Task::ready(Err(anyhow!("cannot join while on another call")));
238        }
239
240        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
241            call
242        } else {
243            return Task::ready(Err(anyhow!("no incoming call")));
244        };
245
246        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
247
248        cx.spawn(|this, mut cx| async move {
249            let room = join.await?;
250            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
251                .await?;
252            this.update(&mut cx, |this, cx| {
253                this.report_call_event("accept incoming", cx)
254            });
255            Ok(())
256        })
257    }
258
259    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
260        let call = self
261            .incoming_call
262            .0
263            .borrow_mut()
264            .take()
265            .ok_or_else(|| anyhow!("no incoming call"))?;
266        self.report_call_event_for_room("decline incoming", call.room_id, cx);
267        self.client.send(proto::DeclineCall {
268            room_id: call.room_id,
269        })?;
270        Ok(())
271    }
272
273    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
274        cx.notify();
275        self.report_call_event("hang up", cx);
276        if let Some((room, _)) = self.room.take() {
277            room.update(cx, |room, cx| room.leave(cx))
278        } else {
279            Task::ready(Ok(()))
280        }
281    }
282
283    pub fn toggle_screen_sharing(&self, cx: &mut AppContext) {
284        if let Some(room) = self.room().cloned() {
285            let toggle_screen_sharing = room.update(cx, |room, cx| {
286                if room.is_screen_sharing() {
287                    self.report_call_event("disable screen share", cx);
288                    Task::ready(room.unshare_screen(cx))
289                } else {
290                    self.report_call_event("enable screen share", cx);
291                    room.share_screen(cx)
292                }
293            });
294            toggle_screen_sharing.detach_and_log_err(cx);
295        }
296    }
297
298    pub fn share_project(
299        &mut self,
300        project: ModelHandle<Project>,
301        cx: &mut ModelContext<Self>,
302    ) -> Task<Result<u64>> {
303        if let Some((room, _)) = self.room.as_ref() {
304            self.report_call_event("share project", cx);
305            room.update(cx, |room, cx| room.share_project(project, cx))
306        } else {
307            Task::ready(Err(anyhow!("no active call")))
308        }
309    }
310
311    pub fn unshare_project(
312        &mut self,
313        project: ModelHandle<Project>,
314        cx: &mut ModelContext<Self>,
315    ) -> Result<()> {
316        if let Some((room, _)) = self.room.as_ref() {
317            self.report_call_event("unshare project", cx);
318            room.update(cx, |room, cx| room.unshare_project(project, cx))
319        } else {
320            Err(anyhow!("no active call"))
321        }
322    }
323
324    pub fn set_location(
325        &mut self,
326        project: Option<&ModelHandle<Project>>,
327        cx: &mut ModelContext<Self>,
328    ) -> Task<Result<()>> {
329        self.location = project.map(|project| project.downgrade());
330        if let Some((room, _)) = self.room.as_ref() {
331            room.update(cx, |room, cx| room.set_location(project, cx))
332        } else {
333            Task::ready(Ok(()))
334        }
335    }
336
337    fn set_room(
338        &mut self,
339        room: Option<ModelHandle<Room>>,
340        cx: &mut ModelContext<Self>,
341    ) -> Task<Result<()>> {
342        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
343            cx.notify();
344            if let Some(room) = room {
345                if room.read(cx).status().is_offline() {
346                    self.room = None;
347                    Task::ready(Ok(()))
348                } else {
349                    let subscriptions = vec![
350                        cx.observe(&room, |this, room, cx| {
351                            if room.read(cx).status().is_offline() {
352                                this.set_room(None, cx).detach_and_log_err(cx);
353                            }
354
355                            cx.notify();
356                        }),
357                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
358                    ];
359                    self.room = Some((room.clone(), subscriptions));
360                    let location = self.location.and_then(|location| location.upgrade(cx));
361                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
362                }
363            } else {
364                self.room = None;
365                Task::ready(Ok(()))
366            }
367        } else {
368            Task::ready(Ok(()))
369        }
370    }
371
372    pub fn room(&self) -> Option<&ModelHandle<Room>> {
373        self.room.as_ref().map(|(room, _)| room)
374    }
375
376    pub fn pending_invites(&self) -> &HashSet<u64> {
377        &self.pending_invites
378    }
379
380    fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
381        if let Some(room) = self.room() {
382            self.report_call_event_for_room(operation, room.read(cx).id(), cx)
383        }
384    }
385
386    fn report_call_event_for_room(&self, operation: &'static str, room_id: u64, cx: &AppContext) {
387        let telemetry = self.client.telemetry();
388        let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
389
390        let event = ClickhouseEvent::Call { operation, room_id };
391
392        telemetry.report_clickhouse_event(event, telemetry_settings);
393    }
394}