call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use call_settings::CallSettings;
  8use client::{
  9    proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
 10    ZED_ALWAYS_ACTIVE,
 11};
 12use collections::HashSet;
 13use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 14use gpui::{
 15    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 16    WeakModelHandle,
 17};
 18use postage::watch;
 19use project::Project;
 20use std::sync::Arc;
 21
 22pub use participant::ParticipantLocation;
 23pub use room::Room;
 24
 25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 26    settings::register::<CallSettings>(cx);
 27
 28    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 29    cx.set_global(active_call);
 30}
 31
 32#[derive(Clone)]
 33pub struct IncomingCall {
 34    pub room_id: u64,
 35    pub calling_user: Arc<User>,
 36    pub participants: Vec<Arc<User>>,
 37    pub initial_project: Option<proto::ParticipantProject>,
 38}
 39
 40pub struct OneAtATime {
 41    cancel: Option<oneshot::Sender<()>>,
 42}
 43
 44impl OneAtATime {
 45    /// spawn a task in the given context.
 46    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 47    /// otherwise you'll see the result of the task.
 48    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 49    where
 50        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 51        Fut: Future<Output = Result<R>>,
 52        R: 'static,
 53    {
 54        let (tx, rx) = oneshot::channel();
 55        self.cancel.replace(tx);
 56        cx.spawn(|cx| async move {
 57            futures::select_biased! {
 58                _ = rx.fuse() => Ok(None),
 59                result = f(cx).fuse() => result.map(Some),
 60            }
 61        })
 62    }
 63}
 64
 65/// Singleton global maintaining the user's participation in a room across workspaces.
 66pub struct ActiveCall {
 67    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 68    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 69    _join_debouncer: OneAtATime,
 70    location: Option<WeakModelHandle<Project>>,
 71    pending_invites: HashSet<u64>,
 72    incoming_call: (
 73        watch::Sender<Option<IncomingCall>>,
 74        watch::Receiver<Option<IncomingCall>>,
 75    ),
 76    client: Arc<Client>,
 77    user_store: ModelHandle<UserStore>,
 78    _subscriptions: Vec<client::Subscription>,
 79}
 80
 81impl Entity for ActiveCall {
 82    type Event = room::Event;
 83}
 84
 85impl ActiveCall {
 86    fn new(
 87        client: Arc<Client>,
 88        user_store: ModelHandle<UserStore>,
 89        cx: &mut ModelContext<Self>,
 90    ) -> Self {
 91        Self {
 92            room: None,
 93            pending_room_creation: None,
 94            location: None,
 95            pending_invites: Default::default(),
 96            incoming_call: watch::channel(),
 97
 98            _join_debouncer: OneAtATime { cancel: None },
 99            _subscriptions: vec![
100                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
101                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
102            ],
103            client,
104            user_store,
105        }
106    }
107
108    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
109        self.room()?.read(cx).channel_id()
110    }
111
112    async fn handle_incoming_call(
113        this: ModelHandle<Self>,
114        envelope: TypedEnvelope<proto::IncomingCall>,
115        _: Arc<Client>,
116        mut cx: AsyncAppContext,
117    ) -> Result<proto::Ack> {
118        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
119        let call = IncomingCall {
120            room_id: envelope.payload.room_id,
121            participants: user_store
122                .update(&mut cx, |user_store, cx| {
123                    user_store.get_users(envelope.payload.participant_user_ids, cx)
124                })
125                .await?,
126            calling_user: user_store
127                .update(&mut cx, |user_store, cx| {
128                    user_store.get_user(envelope.payload.calling_user_id, cx)
129                })
130                .await?,
131            initial_project: envelope.payload.initial_project,
132        };
133        this.update(&mut cx, |this, _| {
134            *this.incoming_call.0.borrow_mut() = Some(call);
135        });
136
137        Ok(proto::Ack {})
138    }
139
140    async fn handle_call_canceled(
141        this: ModelHandle<Self>,
142        envelope: TypedEnvelope<proto::CallCanceled>,
143        _: Arc<Client>,
144        mut cx: AsyncAppContext,
145    ) -> Result<()> {
146        this.update(&mut cx, |this, _| {
147            let mut incoming_call = this.incoming_call.0.borrow_mut();
148            if incoming_call
149                .as_ref()
150                .map_or(false, |call| call.room_id == envelope.payload.room_id)
151            {
152                incoming_call.take();
153            }
154        });
155        Ok(())
156    }
157
158    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
159        cx.global::<ModelHandle<Self>>().clone()
160    }
161
162    pub fn invite(
163        &mut self,
164        called_user_id: u64,
165        initial_project: Option<ModelHandle<Project>>,
166        cx: &mut ModelContext<Self>,
167    ) -> Task<Result<()>> {
168        if !self.pending_invites.insert(called_user_id) {
169            return Task::ready(Err(anyhow!("user was already invited")));
170        }
171        cx.notify();
172
173        let room = if let Some(room) = self.room().cloned() {
174            Some(Task::ready(Ok(room)).shared())
175        } else {
176            self.pending_room_creation.clone()
177        };
178
179        let invite = if let Some(room) = room {
180            cx.spawn_weak(|_, mut cx| async move {
181                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
182
183                let initial_project_id = if let Some(initial_project) = initial_project {
184                    Some(
185                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
186                            .await?,
187                    )
188                } else {
189                    None
190                };
191
192                room.update(&mut cx, |room, cx| {
193                    room.call(called_user_id, initial_project_id, cx)
194                })
195                .await?;
196
197                anyhow::Ok(())
198            })
199        } else {
200            let client = self.client.clone();
201            let user_store = self.user_store.clone();
202            let room = cx
203                .spawn(|this, mut cx| async move {
204                    let create_room = async {
205                        let room = cx
206                            .update(|cx| {
207                                Room::create(
208                                    called_user_id,
209                                    initial_project,
210                                    client,
211                                    user_store,
212                                    cx,
213                                )
214                            })
215                            .await?;
216
217                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
218                            .await?;
219
220                        anyhow::Ok(room)
221                    };
222
223                    let room = create_room.await;
224                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
225                    room.map_err(Arc::new)
226                })
227                .shared();
228            self.pending_room_creation = Some(room.clone());
229            cx.foreground().spawn(async move {
230                room.await.map_err(|err| anyhow!("{:?}", err))?;
231                anyhow::Ok(())
232            })
233        };
234
235        cx.spawn(|this, mut cx| async move {
236            let result = invite.await;
237            if result.is_ok() {
238                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
239            } else {
240                // TODO: Resport collaboration error
241            }
242
243            this.update(&mut cx, |this, cx| {
244                this.pending_invites.remove(&called_user_id);
245                cx.notify();
246            });
247            result
248        })
249    }
250
251    pub fn cancel_invite(
252        &mut self,
253        called_user_id: u64,
254        cx: &mut ModelContext<Self>,
255    ) -> Task<Result<()>> {
256        let room_id = if let Some(room) = self.room() {
257            room.read(cx).id()
258        } else {
259            return Task::ready(Err(anyhow!("no active call")));
260        };
261
262        let client = self.client.clone();
263        cx.foreground().spawn(async move {
264            client
265                .request(proto::CancelCall {
266                    room_id,
267                    called_user_id,
268                })
269                .await?;
270            anyhow::Ok(())
271        })
272    }
273
274    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
275        self.incoming_call.1.clone()
276    }
277
278    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
279        if self.room.is_some() {
280            return Task::ready(Err(anyhow!("cannot join while on another call")));
281        }
282
283        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
284            call
285        } else {
286            return Task::ready(Err(anyhow!("no incoming call")));
287        };
288
289        let room_id = call.room_id.clone();
290        let client = self.client.clone();
291        let user_store = self.user_store.clone();
292        let join = self
293            ._join_debouncer
294            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
295
296        cx.spawn(|this, mut cx| async move {
297            let room = join.await?;
298            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
299                .await?;
300            this.update(&mut cx, |this, cx| {
301                this.report_call_event("accept incoming", cx)
302            });
303            Ok(())
304        })
305    }
306
307    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
308        let call = self
309            .incoming_call
310            .0
311            .borrow_mut()
312            .take()
313            .ok_or_else(|| anyhow!("no incoming call"))?;
314        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
315        self.client.send(proto::DeclineCall {
316            room_id: call.room_id,
317        })?;
318        Ok(())
319    }
320
321    pub fn join_channel(
322        &mut self,
323        channel_id: u64,
324        cx: &mut ModelContext<Self>,
325    ) -> Task<Result<Option<ModelHandle<Room>>>> {
326        if let Some(room) = self.room().cloned() {
327            if room.read(cx).channel_id() == Some(channel_id) {
328                return Task::ready(Ok(Some(room)));
329            } else {
330                room.update(cx, |room, cx| room.clear_state(cx));
331            }
332        }
333
334        let client = self.client.clone();
335        let user_store = self.user_store.clone();
336        let join = self._join_debouncer.spawn(cx, move |cx| async move {
337            Room::join_channel(channel_id, client, user_store, cx).await
338        });
339
340        cx.spawn(move |this, mut cx| async move {
341            let room = join.await?;
342            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
343                .await?;
344            this.update(&mut cx, |this, cx| {
345                this.report_call_event("join channel", cx)
346            });
347            Ok(room)
348        })
349    }
350
351    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
352        cx.notify();
353        self.report_call_event("hang up", cx);
354        Audio::end_call(cx);
355        if let Some((room, _)) = self.room.take() {
356            room.update(cx, |room, cx| room.leave(cx))
357        } else {
358            Task::ready(Ok(()))
359        }
360    }
361
362    pub fn share_project(
363        &mut self,
364        project: ModelHandle<Project>,
365        cx: &mut ModelContext<Self>,
366    ) -> Task<Result<u64>> {
367        if let Some((room, _)) = self.room.as_ref() {
368            self.report_call_event("share project", cx);
369            room.update(cx, |room, cx| room.share_project(project, cx))
370        } else {
371            Task::ready(Err(anyhow!("no active call")))
372        }
373    }
374
375    pub fn unshare_project(
376        &mut self,
377        project: ModelHandle<Project>,
378        cx: &mut ModelContext<Self>,
379    ) -> Result<()> {
380        if let Some((room, _)) = self.room.as_ref() {
381            self.report_call_event("unshare project", cx);
382            room.update(cx, |room, cx| room.unshare_project(project, cx))
383        } else {
384            Err(anyhow!("no active call"))
385        }
386    }
387
388    pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
389        self.location.as_ref()
390    }
391
392    pub fn set_location(
393        &mut self,
394        project: Option<&ModelHandle<Project>>,
395        cx: &mut ModelContext<Self>,
396    ) -> Task<Result<()>> {
397        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
398            self.location = project.map(|project| project.downgrade());
399            if let Some((room, _)) = self.room.as_ref() {
400                return room.update(cx, |room, cx| room.set_location(project, cx));
401            }
402        }
403        Task::ready(Ok(()))
404    }
405
406    fn set_room(
407        &mut self,
408        room: Option<ModelHandle<Room>>,
409        cx: &mut ModelContext<Self>,
410    ) -> Task<Result<()>> {
411        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
412            cx.notify();
413            if let Some(room) = room {
414                if room.read(cx).status().is_offline() {
415                    self.room = None;
416                    Task::ready(Ok(()))
417                } else {
418                    let subscriptions = vec![
419                        cx.observe(&room, |this, room, cx| {
420                            if room.read(cx).status().is_offline() {
421                                this.set_room(None, cx).detach_and_log_err(cx);
422                            }
423
424                            cx.notify();
425                        }),
426                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
427                    ];
428                    self.room = Some((room.clone(), subscriptions));
429                    let location = self.location.and_then(|location| location.upgrade(cx));
430                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
431                }
432            } else {
433                self.room = None;
434                Task::ready(Ok(()))
435            }
436        } else {
437            Task::ready(Ok(()))
438        }
439    }
440
441    pub fn room(&self) -> Option<&ModelHandle<Room>> {
442        self.room.as_ref().map(|(room, _)| room)
443    }
444
445    pub fn client(&self) -> Arc<Client> {
446        self.client.clone()
447    }
448
449    pub fn pending_invites(&self) -> &HashSet<u64> {
450        &self.pending_invites
451    }
452
453    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
454        if let Some(room) = self.room() {
455            let room = room.read(cx);
456            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
457        }
458    }
459}
460
461pub fn report_call_event_for_room(
462    operation: &'static str,
463    room_id: u64,
464    channel_id: Option<u64>,
465    client: &Arc<Client>,
466    cx: &AppContext,
467) {
468    let telemetry = client.telemetry();
469    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
470    let event = ClickhouseEvent::Call {
471        operation,
472        room_id: Some(room_id),
473        channel_id,
474    };
475    telemetry.report_clickhouse_event(event, telemetry_settings);
476}
477
478pub fn report_call_event_for_channel(
479    operation: &'static str,
480    channel_id: u64,
481    client: &Arc<Client>,
482    cx: &AppContext,
483) {
484    let room = ActiveCall::global(cx).read(cx).room();
485
486    let telemetry = client.telemetry();
487    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
488
489    let event = ClickhouseEvent::Call {
490        operation,
491        room_id: room.map(|r| r.read(cx).id()),
492        channel_id: Some(channel_id),
493    };
494    telemetry.report_clickhouse_event(event, telemetry_settings);
495}
496
497#[cfg(test)]
498mod test {
499    use gpui::TestAppContext;
500
501    use crate::OneAtATime;
502
503    #[gpui::test]
504    async fn test_one_at_a_time(cx: &mut TestAppContext) {
505        let mut one_at_a_time = OneAtATime { cancel: None };
506
507        assert_eq!(
508            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
509                .await
510                .unwrap(),
511            Some(1)
512        );
513
514        let (a, b) = cx.update(|cx| {
515            (
516                one_at_a_time.spawn(cx, |_| async {
517                    assert!(false);
518                    Ok(2)
519                }),
520                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
521            )
522        });
523
524        assert_eq!(a.await.unwrap(), None);
525        assert_eq!(b.await.unwrap(), Some(3));
526
527        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
528        drop(one_at_a_time);
529
530        assert_eq!(promise.await.unwrap(), None);
531    }
532}