call2.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use call_settings::CallSettings;
  8use client::{
  9    proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
 10    ZED_ALWAYS_ACTIVE,
 11};
 12use collections::HashSet;
 13use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 14use gpui::{
 15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Subscription, Task,
 16    WeakModel,
 17};
 18use postage::watch;
 19use project::Project;
 20use room::Event;
 21use settings::Settings;
 22use std::sync::Arc;
 23
 24pub use participant::ParticipantLocation;
 25pub use room::Room;
 26
 27pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 28    CallSettings::register(cx);
 29
 30    let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
 31    cx.set_global(active_call);
 32}
 33
 34pub struct OneAtATime {
 35    cancel: Option<oneshot::Sender<()>>,
 36}
 37
 38impl OneAtATime {
 39    /// spawn a task in the given context.
 40    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 41    /// otherwise you'll see the result of the task.
 42    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 43    where
 44        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 45        Fut: Future<Output = Result<R>>,
 46        R: 'static,
 47    {
 48        let (tx, rx) = oneshot::channel();
 49        self.cancel.replace(tx);
 50        cx.spawn(|cx| async move {
 51            futures::select_biased! {
 52                _ = rx.fuse() => Ok(None),
 53                result = f(cx).fuse() => result.map(Some),
 54            }
 55        })
 56    }
 57
 58    fn running(&self) -> bool {
 59        self.cancel
 60            .as_ref()
 61            .is_some_and(|cancel| !cancel.is_canceled())
 62    }
 63}
 64
 65#[derive(Clone)]
 66pub struct IncomingCall {
 67    pub room_id: u64,
 68    pub calling_user: Arc<User>,
 69    pub participants: Vec<Arc<User>>,
 70    pub initial_project: Option<proto::ParticipantProject>,
 71}
 72
 73/// Singleton global maintaining the user's participation in a room across workspaces.
 74pub struct ActiveCall {
 75    room: Option<(Model<Room>, Vec<Subscription>)>,
 76    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 77    location: Option<WeakModel<Project>>,
 78    _join_debouncer: OneAtATime,
 79    pending_invites: HashSet<u64>,
 80    incoming_call: (
 81        watch::Sender<Option<IncomingCall>>,
 82        watch::Receiver<Option<IncomingCall>>,
 83    ),
 84    client: Arc<Client>,
 85    user_store: Model<UserStore>,
 86    _subscriptions: Vec<client::Subscription>,
 87}
 88
 89impl EventEmitter<Event> for ActiveCall {}
 90
 91impl ActiveCall {
 92    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
 93        Self {
 94            room: None,
 95            pending_room_creation: None,
 96            location: None,
 97            pending_invites: Default::default(),
 98            incoming_call: watch::channel(),
 99            _join_debouncer: OneAtATime { cancel: None },
100            _subscriptions: vec![
101                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
102                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
103            ],
104            client,
105            user_store,
106        }
107    }
108
109    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
110        self.room()?.read(cx).channel_id()
111    }
112
113    async fn handle_incoming_call(
114        this: Model<Self>,
115        envelope: TypedEnvelope<proto::IncomingCall>,
116        _: Arc<Client>,
117        mut cx: AsyncAppContext,
118    ) -> Result<proto::Ack> {
119        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
120        let call = IncomingCall {
121            room_id: envelope.payload.room_id,
122            participants: user_store
123                .update(&mut cx, |user_store, cx| {
124                    user_store.get_users(envelope.payload.participant_user_ids, cx)
125                })?
126                .await?,
127            calling_user: user_store
128                .update(&mut cx, |user_store, cx| {
129                    user_store.get_user(envelope.payload.calling_user_id, cx)
130                })?
131                .await?,
132            initial_project: envelope.payload.initial_project,
133        };
134        this.update(&mut cx, |this, _| {
135            *this.incoming_call.0.borrow_mut() = Some(call);
136        })?;
137
138        Ok(proto::Ack {})
139    }
140
141    async fn handle_call_canceled(
142        this: Model<Self>,
143        envelope: TypedEnvelope<proto::CallCanceled>,
144        _: Arc<Client>,
145        mut cx: AsyncAppContext,
146    ) -> Result<()> {
147        this.update(&mut cx, |this, _| {
148            let mut incoming_call = this.incoming_call.0.borrow_mut();
149            if incoming_call
150                .as_ref()
151                .map_or(false, |call| call.room_id == envelope.payload.room_id)
152            {
153                incoming_call.take();
154            }
155        })?;
156        Ok(())
157    }
158
159    pub fn global(cx: &AppContext) -> Model<Self> {
160        cx.global::<Model<Self>>().clone()
161    }
162
163    pub fn invite(
164        &mut self,
165        called_user_id: u64,
166        initial_project: Option<Model<Project>>,
167        cx: &mut ModelContext<Self>,
168    ) -> Task<Result<()>> {
169        if !self.pending_invites.insert(called_user_id) {
170            return Task::ready(Err(anyhow!("user was already invited")));
171        }
172        cx.notify();
173
174        if self._join_debouncer.running() {
175            return Task::ready(Ok(()));
176        }
177
178        let room = if let Some(room) = self.room().cloned() {
179            Some(Task::ready(Ok(room)).shared())
180        } else {
181            self.pending_room_creation.clone()
182        };
183
184        let invite = if let Some(room) = room {
185            cx.spawn(move |_, mut cx| async move {
186                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
187
188                let initial_project_id = if let Some(initial_project) = initial_project {
189                    Some(
190                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
191                            .await?,
192                    )
193                } else {
194                    None
195                };
196
197                room.update(&mut cx, move |room, cx| {
198                    room.call(called_user_id, initial_project_id, cx)
199                })?
200                .await?;
201
202                anyhow::Ok(())
203            })
204        } else {
205            let client = self.client.clone();
206            let user_store = self.user_store.clone();
207            let room = cx
208                .spawn(move |this, mut cx| async move {
209                    let create_room = async {
210                        let room = cx
211                            .update(|cx| {
212                                Room::create(
213                                    called_user_id,
214                                    initial_project,
215                                    client,
216                                    user_store,
217                                    cx,
218                                )
219                            })?
220                            .await?;
221
222                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
223                            .await?;
224
225                        anyhow::Ok(room)
226                    };
227
228                    let room = create_room.await;
229                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
230                    room.map_err(Arc::new)
231                })
232                .shared();
233            self.pending_room_creation = Some(room.clone());
234            cx.background_executor().spawn(async move {
235                room.await.map_err(|err| anyhow!("{:?}", err))?;
236                anyhow::Ok(())
237            })
238        };
239
240        cx.spawn(move |this, mut cx| async move {
241            let result = invite.await;
242            if result.is_ok() {
243                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
244            } else {
245                // TODO: Resport collaboration error
246            }
247
248            this.update(&mut cx, |this, cx| {
249                this.pending_invites.remove(&called_user_id);
250                cx.notify();
251            })?;
252            result
253        })
254    }
255
256    pub fn cancel_invite(
257        &mut self,
258        called_user_id: u64,
259        cx: &mut ModelContext<Self>,
260    ) -> Task<Result<()>> {
261        let room_id = if let Some(room) = self.room() {
262            room.read(cx).id()
263        } else {
264            return Task::ready(Err(anyhow!("no active call")));
265        };
266
267        let client = self.client.clone();
268        cx.background_executor().spawn(async move {
269            client
270                .request(proto::CancelCall {
271                    room_id,
272                    called_user_id,
273                })
274                .await?;
275            anyhow::Ok(())
276        })
277    }
278
279    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
280        self.incoming_call.1.clone()
281    }
282
283    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
284        if self.room.is_some() {
285            return Task::ready(Err(anyhow!("cannot join while on another call")));
286        }
287
288        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
289            call
290        } else {
291            return Task::ready(Err(anyhow!("no incoming call")));
292        };
293
294        if self.pending_room_creation.is_some() {
295            return Task::ready(Ok(()));
296        }
297
298        let room_id = call.room_id.clone();
299        let client = self.client.clone();
300        let user_store = self.user_store.clone();
301        let join = self
302            ._join_debouncer
303            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
304
305        cx.spawn(|this, mut cx| async move {
306            let room = join.await?;
307            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
308                .await?;
309            this.update(&mut cx, |this, cx| {
310                this.report_call_event("accept incoming", cx)
311            })?;
312            Ok(())
313        })
314    }
315
316    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
317        let call = self
318            .incoming_call
319            .0
320            .borrow_mut()
321            .take()
322            .ok_or_else(|| anyhow!("no incoming call"))?;
323        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
324        self.client.send(proto::DeclineCall {
325            room_id: call.room_id,
326        })?;
327        Ok(())
328    }
329
330    pub fn join_channel(
331        &mut self,
332        channel_id: u64,
333        cx: &mut ModelContext<Self>,
334    ) -> Task<Result<Option<Model<Room>>>> {
335        if let Some(room) = self.room().cloned() {
336            if room.read(cx).channel_id() == Some(channel_id) {
337                return Task::ready(Ok(Some(room)));
338            } else {
339                room.update(cx, |room, cx| room.clear_state(cx));
340            }
341        }
342
343        if self.pending_room_creation.is_some() {
344            return Task::ready(Ok(None));
345        }
346
347        let client = self.client.clone();
348        let user_store = self.user_store.clone();
349        let join = self._join_debouncer.spawn(cx, move |cx| async move {
350            Room::join_channel(channel_id, client, user_store, cx).await
351        });
352
353        cx.spawn(|this, mut cx| async move {
354            let room = join.await?;
355            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
356                .await?;
357            this.update(&mut cx, |this, cx| {
358                this.report_call_event("join channel", cx)
359            })?;
360            Ok(room)
361        })
362    }
363
364    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
365        cx.notify();
366        self.report_call_event("hang up", cx);
367
368        Audio::end_call(cx);
369        if let Some((room, _)) = self.room.take() {
370            room.update(cx, |room, cx| room.leave(cx))
371        } else {
372            Task::ready(Ok(()))
373        }
374    }
375
376    pub fn share_project(
377        &mut self,
378        project: Model<Project>,
379        cx: &mut ModelContext<Self>,
380    ) -> Task<Result<u64>> {
381        if let Some((room, _)) = self.room.as_ref() {
382            self.report_call_event("share project", cx);
383            room.update(cx, |room, cx| room.share_project(project, cx))
384        } else {
385            Task::ready(Err(anyhow!("no active call")))
386        }
387    }
388
389    pub fn unshare_project(
390        &mut self,
391        project: Model<Project>,
392        cx: &mut ModelContext<Self>,
393    ) -> Result<()> {
394        if let Some((room, _)) = self.room.as_ref() {
395            self.report_call_event("unshare project", cx);
396            room.update(cx, |room, cx| room.unshare_project(project, cx))
397        } else {
398            Err(anyhow!("no active call"))
399        }
400    }
401
402    pub fn location(&self) -> Option<&WeakModel<Project>> {
403        self.location.as_ref()
404    }
405
406    pub fn set_location(
407        &mut self,
408        project: Option<&Model<Project>>,
409        cx: &mut ModelContext<Self>,
410    ) -> Task<Result<()>> {
411        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
412            self.location = project.map(|project| project.downgrade());
413            if let Some((room, _)) = self.room.as_ref() {
414                return room.update(cx, |room, cx| room.set_location(project, cx));
415            }
416        }
417        Task::ready(Ok(()))
418    }
419
420    fn set_room(
421        &mut self,
422        room: Option<Model<Room>>,
423        cx: &mut ModelContext<Self>,
424    ) -> Task<Result<()>> {
425        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
426            cx.notify();
427            if let Some(room) = room {
428                if room.read(cx).status().is_offline() {
429                    self.room = None;
430                    Task::ready(Ok(()))
431                } else {
432                    let subscriptions = vec![
433                        cx.observe(&room, |this, room, cx| {
434                            if room.read(cx).status().is_offline() {
435                                this.set_room(None, cx).detach_and_log_err(cx);
436                            }
437
438                            cx.notify();
439                        }),
440                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
441                    ];
442                    self.room = Some((room.clone(), subscriptions));
443                    let location = self
444                        .location
445                        .as_ref()
446                        .and_then(|location| location.upgrade());
447                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
448                }
449            } else {
450                self.room = None;
451                Task::ready(Ok(()))
452            }
453        } else {
454            Task::ready(Ok(()))
455        }
456    }
457
458    pub fn room(&self) -> Option<&Model<Room>> {
459        self.room.as_ref().map(|(room, _)| room)
460    }
461
462    pub fn client(&self) -> Arc<Client> {
463        self.client.clone()
464    }
465
466    pub fn pending_invites(&self) -> &HashSet<u64> {
467        &self.pending_invites
468    }
469
470    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
471        if let Some(room) = self.room() {
472            let room = room.read(cx);
473            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
474        }
475    }
476}
477
478pub fn report_call_event_for_room(
479    operation: &'static str,
480    room_id: u64,
481    channel_id: Option<u64>,
482    client: &Arc<Client>,
483    cx: &AppContext,
484) {
485    let telemetry = client.telemetry();
486    let telemetry_settings = *TelemetrySettings::get_global(cx);
487    let event = ClickhouseEvent::Call {
488        operation,
489        room_id: Some(room_id),
490        channel_id,
491    };
492    telemetry.report_clickhouse_event(event, telemetry_settings);
493}
494
495pub fn report_call_event_for_channel(
496    operation: &'static str,
497    channel_id: u64,
498    client: &Arc<Client>,
499    cx: &AppContext,
500) {
501    let room = ActiveCall::global(cx).read(cx).room();
502
503    let telemetry = client.telemetry();
504
505    let telemetry_settings = *TelemetrySettings::get_global(cx);
506
507    let event = ClickhouseEvent::Call {
508        operation,
509        room_id: room.map(|r| r.read(cx).id()),
510        channel_id: Some(channel_id),
511    };
512    telemetry.report_clickhouse_event(event, telemetry_settings);
513}
514
515#[cfg(test)]
516mod test {
517    use gpui::TestAppContext;
518
519    use crate::OneAtATime;
520
521    #[gpui::test]
522    async fn test_one_at_a_time(cx: &mut TestAppContext) {
523        let mut one_at_a_time = OneAtATime { cancel: None };
524
525        assert_eq!(
526            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
527                .await
528                .unwrap(),
529            Some(1)
530        );
531
532        let (a, b) = cx.update(|cx| {
533            (
534                one_at_a_time.spawn(cx, |_| async {
535                    assert!(false);
536                    Ok(2)
537                }),
538                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
539            )
540        });
541
542        assert_eq!(a.await.unwrap(), None);
543        assert_eq!(b.await.unwrap(), Some(3));
544
545        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
546        drop(one_at_a_time);
547
548        assert_eq!(promise.await.unwrap(), None);
549    }
550}