call2.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use call_settings::CallSettings;
  8use client::{
  9    proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
 10    ZED_ALWAYS_ACTIVE,
 11};
 12use collections::HashSet;
 13use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 14use gpui::{
 15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Subscription, Task,
 16    WeakModel,
 17};
 18use postage::watch;
 19use project::Project;
 20use settings::Settings;
 21use std::sync::Arc;
 22
 23pub use participant::ParticipantLocation;
 24pub use room::Room;
 25
 26pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
 27    CallSettings::register(cx);
 28
 29    let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
 30    cx.set_global(active_call);
 31}
 32
 33pub struct OneAtATime {
 34    cancel: Option<oneshot::Sender<()>>,
 35}
 36
 37impl OneAtATime {
 38    /// spawn a task in the given context.
 39    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 40    /// otherwise you'll see the result of the task.
 41    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 42    where
 43        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 44        Fut: Future<Output = Result<R>>,
 45        R: 'static,
 46    {
 47        let (tx, rx) = oneshot::channel();
 48        self.cancel.replace(tx);
 49        cx.spawn(|cx| async move {
 50            futures::select_biased! {
 51                _ = rx.fuse() => Ok(None),
 52                result = f(cx).fuse() => result.map(Some),
 53            }
 54        })
 55    }
 56
 57    fn running(&self) -> bool {
 58        self.cancel
 59            .as_ref()
 60            .is_some_and(|cancel| !cancel.is_canceled())
 61    }
 62}
 63
 64#[derive(Clone)]
 65pub struct IncomingCall {
 66    pub room_id: u64,
 67    pub calling_user: Arc<User>,
 68    pub participants: Vec<Arc<User>>,
 69    pub initial_project: Option<proto::ParticipantProject>,
 70}
 71
 72/// Singleton global maintaining the user's participation in a room across workspaces.
 73pub struct ActiveCall {
 74    room: Option<(Model<Room>, Vec<Subscription>)>,
 75    pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
 76    location: Option<WeakModel<Project>>,
 77    _join_debouncer: OneAtATime,
 78    pending_invites: HashSet<u64>,
 79    incoming_call: (
 80        watch::Sender<Option<IncomingCall>>,
 81        watch::Receiver<Option<IncomingCall>>,
 82    ),
 83    client: Arc<Client>,
 84    user_store: Model<UserStore>,
 85    _subscriptions: Vec<client::Subscription>,
 86}
 87
 88impl EventEmitter for ActiveCall {
 89    type Event = room::Event;
 90}
 91
 92impl ActiveCall {
 93    fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
 94        Self {
 95            room: None,
 96            pending_room_creation: None,
 97            location: None,
 98            pending_invites: Default::default(),
 99            incoming_call: watch::channel(),
100            _join_debouncer: OneAtATime { cancel: None },
101            _subscriptions: vec![
102                client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
103                client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
104            ],
105            client,
106            user_store,
107        }
108    }
109
110    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
111        self.room()?.read(cx).channel_id()
112    }
113
114    async fn handle_incoming_call(
115        this: Model<Self>,
116        envelope: TypedEnvelope<proto::IncomingCall>,
117        _: Arc<Client>,
118        mut cx: AsyncAppContext,
119    ) -> Result<proto::Ack> {
120        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
121        let call = IncomingCall {
122            room_id: envelope.payload.room_id,
123            participants: user_store
124                .update(&mut cx, |user_store, cx| {
125                    user_store.get_users(envelope.payload.participant_user_ids, cx)
126                })?
127                .await?,
128            calling_user: user_store
129                .update(&mut cx, |user_store, cx| {
130                    user_store.get_user(envelope.payload.calling_user_id, cx)
131                })?
132                .await?,
133            initial_project: envelope.payload.initial_project,
134        };
135        this.update(&mut cx, |this, _| {
136            *this.incoming_call.0.borrow_mut() = Some(call);
137        })?;
138
139        Ok(proto::Ack {})
140    }
141
142    async fn handle_call_canceled(
143        this: Model<Self>,
144        envelope: TypedEnvelope<proto::CallCanceled>,
145        _: Arc<Client>,
146        mut cx: AsyncAppContext,
147    ) -> Result<()> {
148        this.update(&mut cx, |this, _| {
149            let mut incoming_call = this.incoming_call.0.borrow_mut();
150            if incoming_call
151                .as_ref()
152                .map_or(false, |call| call.room_id == envelope.payload.room_id)
153            {
154                incoming_call.take();
155            }
156        })?;
157        Ok(())
158    }
159
160    pub fn global(cx: &AppContext) -> Model<Self> {
161        cx.global::<Model<Self>>().clone()
162    }
163
164    pub fn invite(
165        &mut self,
166        called_user_id: u64,
167        initial_project: Option<Model<Project>>,
168        cx: &mut ModelContext<Self>,
169    ) -> Task<Result<()>> {
170        if !self.pending_invites.insert(called_user_id) {
171            return Task::ready(Err(anyhow!("user was already invited")));
172        }
173        cx.notify();
174
175        if self._join_debouncer.running() {
176            return Task::ready(Ok(()));
177        }
178
179        let room = if let Some(room) = self.room().cloned() {
180            Some(Task::ready(Ok(room)).shared())
181        } else {
182            self.pending_room_creation.clone()
183        };
184
185        let invite = if let Some(room) = room {
186            cx.spawn(move |_, mut cx| async move {
187                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
188
189                let initial_project_id = if let Some(initial_project) = initial_project {
190                    Some(
191                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
192                            .await?,
193                    )
194                } else {
195                    None
196                };
197
198                room.update(&mut cx, move |room, cx| {
199                    room.call(called_user_id, initial_project_id, cx)
200                })?
201                .await?;
202
203                anyhow::Ok(())
204            })
205        } else {
206            let client = self.client.clone();
207            let user_store = self.user_store.clone();
208            let room = cx
209                .spawn(move |this, mut cx| async move {
210                    let create_room = async {
211                        let room = cx
212                            .update(|cx| {
213                                Room::create(
214                                    called_user_id,
215                                    initial_project,
216                                    client,
217                                    user_store,
218                                    cx,
219                                )
220                            })?
221                            .await?;
222
223                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
224                            .await?;
225
226                        anyhow::Ok(room)
227                    };
228
229                    let room = create_room.await;
230                    this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
231                    room.map_err(Arc::new)
232                })
233                .shared();
234            self.pending_room_creation = Some(room.clone());
235            cx.background_executor().spawn(async move {
236                room.await.map_err(|err| anyhow!("{:?}", err))?;
237                anyhow::Ok(())
238            })
239        };
240
241        cx.spawn(move |this, mut cx| async move {
242            let result = invite.await;
243            if result.is_ok() {
244                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
245            } else {
246                // TODO: Resport collaboration error
247            }
248
249            this.update(&mut cx, |this, cx| {
250                this.pending_invites.remove(&called_user_id);
251                cx.notify();
252            })?;
253            result
254        })
255    }
256
257    pub fn cancel_invite(
258        &mut self,
259        called_user_id: u64,
260        cx: &mut ModelContext<Self>,
261    ) -> Task<Result<()>> {
262        let room_id = if let Some(room) = self.room() {
263            room.read(cx).id()
264        } else {
265            return Task::ready(Err(anyhow!("no active call")));
266        };
267
268        let client = self.client.clone();
269        cx.background_executor().spawn(async move {
270            client
271                .request(proto::CancelCall {
272                    room_id,
273                    called_user_id,
274                })
275                .await?;
276            anyhow::Ok(())
277        })
278    }
279
280    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
281        self.incoming_call.1.clone()
282    }
283
284    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
285        if self.room.is_some() {
286            return Task::ready(Err(anyhow!("cannot join while on another call")));
287        }
288
289        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
290            call
291        } else {
292            return Task::ready(Err(anyhow!("no incoming call")));
293        };
294
295        if self.pending_room_creation.is_some() {
296            return Task::ready(Ok(()));
297        }
298
299        let room_id = call.room_id.clone();
300        let client = self.client.clone();
301        let user_store = self.user_store.clone();
302        let join = self
303            ._join_debouncer
304            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
305
306        cx.spawn(|this, mut cx| async move {
307            let room = join.await?;
308            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
309                .await?;
310            this.update(&mut cx, |this, cx| {
311                this.report_call_event("accept incoming", cx)
312            })?;
313            Ok(())
314        })
315    }
316
317    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
318        let call = self
319            .incoming_call
320            .0
321            .borrow_mut()
322            .take()
323            .ok_or_else(|| anyhow!("no incoming call"))?;
324        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
325        self.client.send(proto::DeclineCall {
326            room_id: call.room_id,
327        })?;
328        Ok(())
329    }
330
331    pub fn join_channel(
332        &mut self,
333        channel_id: u64,
334        cx: &mut ModelContext<Self>,
335    ) -> Task<Result<Option<Model<Room>>>> {
336        if let Some(room) = self.room().cloned() {
337            if room.read(cx).channel_id() == Some(channel_id) {
338                return Task::ready(Ok(Some(room)));
339            } else {
340                room.update(cx, |room, cx| room.clear_state(cx));
341            }
342        }
343
344        if self.pending_room_creation.is_some() {
345            return Task::ready(Ok(None));
346        }
347
348        let client = self.client.clone();
349        let user_store = self.user_store.clone();
350        let join = self._join_debouncer.spawn(cx, move |cx| async move {
351            Room::join_channel(channel_id, client, user_store, cx).await
352        });
353
354        cx.spawn(|this, mut cx| async move {
355            let room = join.await?;
356            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
357                .await?;
358            this.update(&mut cx, |this, cx| {
359                this.report_call_event("join channel", cx)
360            })?;
361            Ok(room)
362        })
363    }
364
365    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
366        cx.notify();
367        self.report_call_event("hang up", cx);
368
369        Audio::end_call(cx);
370        if let Some((room, _)) = self.room.take() {
371            room.update(cx, |room, cx| room.leave(cx))
372        } else {
373            Task::ready(Ok(()))
374        }
375    }
376
377    pub fn share_project(
378        &mut self,
379        project: Model<Project>,
380        cx: &mut ModelContext<Self>,
381    ) -> Task<Result<u64>> {
382        if let Some((room, _)) = self.room.as_ref() {
383            self.report_call_event("share project", cx);
384            room.update(cx, |room, cx| room.share_project(project, cx))
385        } else {
386            Task::ready(Err(anyhow!("no active call")))
387        }
388    }
389
390    pub fn unshare_project(
391        &mut self,
392        project: Model<Project>,
393        cx: &mut ModelContext<Self>,
394    ) -> Result<()> {
395        if let Some((room, _)) = self.room.as_ref() {
396            self.report_call_event("unshare project", cx);
397            room.update(cx, |room, cx| room.unshare_project(project, cx))
398        } else {
399            Err(anyhow!("no active call"))
400        }
401    }
402
403    pub fn location(&self) -> Option<&WeakModel<Project>> {
404        self.location.as_ref()
405    }
406
407    pub fn set_location(
408        &mut self,
409        project: Option<&Model<Project>>,
410        cx: &mut ModelContext<Self>,
411    ) -> Task<Result<()>> {
412        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
413            self.location = project.map(|project| project.downgrade());
414            if let Some((room, _)) = self.room.as_ref() {
415                return room.update(cx, |room, cx| room.set_location(project, cx));
416            }
417        }
418        Task::ready(Ok(()))
419    }
420
421    fn set_room(
422        &mut self,
423        room: Option<Model<Room>>,
424        cx: &mut ModelContext<Self>,
425    ) -> Task<Result<()>> {
426        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
427            cx.notify();
428            if let Some(room) = room {
429                if room.read(cx).status().is_offline() {
430                    self.room = None;
431                    Task::ready(Ok(()))
432                } else {
433                    let subscriptions = vec![
434                        cx.observe(&room, |this, room, cx| {
435                            if room.read(cx).status().is_offline() {
436                                this.set_room(None, cx).detach_and_log_err(cx);
437                            }
438
439                            cx.notify();
440                        }),
441                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
442                    ];
443                    self.room = Some((room.clone(), subscriptions));
444                    let location = self
445                        .location
446                        .as_ref()
447                        .and_then(|location| location.upgrade());
448                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
449                }
450            } else {
451                self.room = None;
452                Task::ready(Ok(()))
453            }
454        } else {
455            Task::ready(Ok(()))
456        }
457    }
458
459    pub fn room(&self) -> Option<&Model<Room>> {
460        self.room.as_ref().map(|(room, _)| room)
461    }
462
463    pub fn client(&self) -> Arc<Client> {
464        self.client.clone()
465    }
466
467    pub fn pending_invites(&self) -> &HashSet<u64> {
468        &self.pending_invites
469    }
470
471    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
472        if let Some(room) = self.room() {
473            let room = room.read(cx);
474            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
475        }
476    }
477}
478
479pub fn report_call_event_for_room(
480    operation: &'static str,
481    room_id: u64,
482    channel_id: Option<u64>,
483    client: &Arc<Client>,
484    cx: &AppContext,
485) {
486    let telemetry = client.telemetry();
487    let telemetry_settings = *TelemetrySettings::get_global(cx);
488    let event = ClickhouseEvent::Call {
489        operation,
490        room_id: Some(room_id),
491        channel_id,
492    };
493    telemetry.report_clickhouse_event(event, telemetry_settings);
494}
495
496pub fn report_call_event_for_channel(
497    operation: &'static str,
498    channel_id: u64,
499    client: &Arc<Client>,
500    cx: &AppContext,
501) {
502    let room = ActiveCall::global(cx).read(cx).room();
503
504    let telemetry = client.telemetry();
505
506    let telemetry_settings = *TelemetrySettings::get_global(cx);
507
508    let event = ClickhouseEvent::Call {
509        operation,
510        room_id: room.map(|r| r.read(cx).id()),
511        channel_id: Some(channel_id),
512    };
513    telemetry.report_clickhouse_event(event, telemetry_settings);
514}
515
516#[cfg(test)]
517mod test {
518    use gpui::TestAppContext;
519
520    use crate::OneAtATime;
521
522    #[gpui::test]
523    async fn test_one_at_a_time(cx: &mut TestAppContext) {
524        let mut one_at_a_time = OneAtATime { cancel: None };
525
526        assert_eq!(
527            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
528                .await
529                .unwrap(),
530            Some(1)
531        );
532
533        let (a, b) = cx.update(|cx| {
534            (
535                one_at_a_time.spawn(cx, |_| async {
536                    assert!(false);
537                    Ok(2)
538                }),
539                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
540            )
541        });
542
543        assert_eq!(a.await.unwrap(), None);
544        assert_eq!(b.await.unwrap(), Some(3));
545
546        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
547        drop(one_at_a_time);
548
549        assert_eq!(promise.await.unwrap(), None);
550    }
551}