call.rs

  1pub mod call_settings;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use audio::Audio;
  7use call_settings::CallSettings;
  8use client::{proto, Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
  9use collections::HashSet;
 10use futures::{channel::oneshot, future::Shared, Future, FutureExt};
 11use gpui::{
 12    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 13    WeakModelHandle,
 14};
 15use postage::watch;
 16use project::Project;
 17use std::sync::Arc;
 18
 19pub use participant::ParticipantLocation;
 20pub use room::Room;
 21
 22pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 23    settings::register::<CallSettings>(cx);
 24
 25    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 26    cx.set_global(active_call);
 27}
 28
 29#[derive(Clone)]
 30pub struct IncomingCall {
 31    pub room_id: u64,
 32    pub calling_user: Arc<User>,
 33    pub participants: Vec<Arc<User>>,
 34    pub initial_project: Option<proto::ParticipantProject>,
 35}
 36
 37pub struct OneAtATime {
 38    cancel: Option<oneshot::Sender<()>>,
 39}
 40
 41impl OneAtATime {
 42    /// spawn a task in the given context.
 43    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
 44    /// otherwise you'll see the result of the task.
 45    fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
 46    where
 47        F: 'static + FnOnce(AsyncAppContext) -> Fut,
 48        Fut: Future<Output = Result<R>>,
 49        R: 'static,
 50    {
 51        let (tx, rx) = oneshot::channel();
 52        self.cancel.replace(tx);
 53        cx.spawn(|cx| async move {
 54            futures::select_biased! {
 55                _ = rx.fuse() => Ok(None),
 56                result = f(cx).fuse() => result.map(Some),
 57            }
 58        })
 59    }
 60
 61    fn running(&self) -> bool {
 62        self.cancel
 63            .as_ref()
 64            .is_some_and(|cancel| !cancel.is_canceled())
 65    }
 66}
 67
 68/// Singleton global maintaining the user's participation in a room across workspaces.
 69pub struct ActiveCall {
 70    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 71    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 72    _join_debouncer: OneAtATime,
 73    location: Option<WeakModelHandle<Project>>,
 74    pending_invites: HashSet<u64>,
 75    incoming_call: (
 76        watch::Sender<Option<IncomingCall>>,
 77        watch::Receiver<Option<IncomingCall>>,
 78    ),
 79    client: Arc<Client>,
 80    user_store: ModelHandle<UserStore>,
 81    _subscriptions: Vec<client::Subscription>,
 82}
 83
 84impl Entity for ActiveCall {
 85    type Event = room::Event;
 86}
 87
 88impl ActiveCall {
 89    fn new(
 90        client: Arc<Client>,
 91        user_store: ModelHandle<UserStore>,
 92        cx: &mut ModelContext<Self>,
 93    ) -> Self {
 94        Self {
 95            room: None,
 96            pending_room_creation: None,
 97            location: None,
 98            pending_invites: Default::default(),
 99            incoming_call: watch::channel(),
100
101            _join_debouncer: OneAtATime { cancel: None },
102            _subscriptions: vec![
103                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
104                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
105            ],
106            client,
107            user_store,
108        }
109    }
110
111    pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
112        self.room()?.read(cx).channel_id()
113    }
114
115    async fn handle_incoming_call(
116        this: ModelHandle<Self>,
117        envelope: TypedEnvelope<proto::IncomingCall>,
118        _: Arc<Client>,
119        mut cx: AsyncAppContext,
120    ) -> Result<proto::Ack> {
121        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
122        let call = IncomingCall {
123            room_id: envelope.payload.room_id,
124            participants: user_store
125                .update(&mut cx, |user_store, cx| {
126                    user_store.get_users(envelope.payload.participant_user_ids, cx)
127                })
128                .await?,
129            calling_user: user_store
130                .update(&mut cx, |user_store, cx| {
131                    user_store.get_user(envelope.payload.calling_user_id, cx)
132                })
133                .await?,
134            initial_project: envelope.payload.initial_project,
135        };
136        this.update(&mut cx, |this, _| {
137            *this.incoming_call.0.borrow_mut() = Some(call);
138        });
139
140        Ok(proto::Ack {})
141    }
142
143    async fn handle_call_canceled(
144        this: ModelHandle<Self>,
145        envelope: TypedEnvelope<proto::CallCanceled>,
146        _: Arc<Client>,
147        mut cx: AsyncAppContext,
148    ) -> Result<()> {
149        this.update(&mut cx, |this, _| {
150            let mut incoming_call = this.incoming_call.0.borrow_mut();
151            if incoming_call
152                .as_ref()
153                .map_or(false, |call| call.room_id == envelope.payload.room_id)
154            {
155                incoming_call.take();
156            }
157        });
158        Ok(())
159    }
160
161    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
162        cx.global::<ModelHandle<Self>>().clone()
163    }
164
165    pub fn invite(
166        &mut self,
167        called_user_id: u64,
168        initial_project: Option<ModelHandle<Project>>,
169        cx: &mut ModelContext<Self>,
170    ) -> Task<Result<()>> {
171        if !self.pending_invites.insert(called_user_id) {
172            return Task::ready(Err(anyhow!("user was already invited")));
173        }
174        cx.notify();
175
176        if self._join_debouncer.running() {
177            return Task::ready(Ok(()));
178        }
179
180        let room = if let Some(room) = self.room().cloned() {
181            Some(Task::ready(Ok(room)).shared())
182        } else {
183            self.pending_room_creation.clone()
184        };
185
186        let invite = if let Some(room) = room {
187            cx.spawn_weak(|_, mut cx| async move {
188                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
189
190                let initial_project_id = if let Some(initial_project) = initial_project {
191                    Some(
192                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
193                            .await?,
194                    )
195                } else {
196                    None
197                };
198
199                room.update(&mut cx, |room, cx| {
200                    room.call(called_user_id, initial_project_id, cx)
201                })
202                .await?;
203
204                anyhow::Ok(())
205            })
206        } else {
207            let client = self.client.clone();
208            let user_store = self.user_store.clone();
209            let room = cx
210                .spawn(|this, mut cx| async move {
211                    let create_room = async {
212                        let room = cx
213                            .update(|cx| {
214                                Room::create(
215                                    called_user_id,
216                                    initial_project,
217                                    client,
218                                    user_store,
219                                    cx,
220                                )
221                            })
222                            .await?;
223
224                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
225                            .await?;
226
227                        anyhow::Ok(room)
228                    };
229
230                    let room = create_room.await;
231                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
232                    room.map_err(Arc::new)
233                })
234                .shared();
235            self.pending_room_creation = Some(room.clone());
236            cx.foreground().spawn(async move {
237                room.await.map_err(|err| anyhow!("{:?}", err))?;
238                anyhow::Ok(())
239            })
240        };
241
242        cx.spawn(|this, mut cx| async move {
243            let result = invite.await;
244            if result.is_ok() {
245                this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
246            } else {
247                // TODO: Resport collaboration error
248            }
249
250            this.update(&mut cx, |this, cx| {
251                this.pending_invites.remove(&called_user_id);
252                cx.notify();
253            });
254            result
255        })
256    }
257
258    pub fn cancel_invite(
259        &mut self,
260        called_user_id: u64,
261        cx: &mut ModelContext<Self>,
262    ) -> Task<Result<()>> {
263        let room_id = if let Some(room) = self.room() {
264            room.read(cx).id()
265        } else {
266            return Task::ready(Err(anyhow!("no active call")));
267        };
268
269        let client = self.client.clone();
270        cx.foreground().spawn(async move {
271            client
272                .request(proto::CancelCall {
273                    room_id,
274                    called_user_id,
275                })
276                .await?;
277            anyhow::Ok(())
278        })
279    }
280
281    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
282        self.incoming_call.1.clone()
283    }
284
285    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286        if self.room.is_some() {
287            return Task::ready(Err(anyhow!("cannot join while on another call")));
288        }
289
290        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
291            call
292        } else {
293            return Task::ready(Err(anyhow!("no incoming call")));
294        };
295
296        if self.pending_room_creation.is_some() {
297            return Task::ready(Ok(()));
298        }
299
300        let room_id = call.room_id.clone();
301        let client = self.client.clone();
302        let user_store = self.user_store.clone();
303        let join = self
304            ._join_debouncer
305            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
306
307        cx.spawn(|this, mut cx| async move {
308            let room = join.await?;
309            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
310                .await?;
311            this.update(&mut cx, |this, cx| {
312                this.report_call_event("accept incoming", cx)
313            });
314            Ok(())
315        })
316    }
317
318    pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
319        let call = self
320            .incoming_call
321            .0
322            .borrow_mut()
323            .take()
324            .ok_or_else(|| anyhow!("no incoming call"))?;
325        report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
326        self.client.send(proto::DeclineCall {
327            room_id: call.room_id,
328        })?;
329        Ok(())
330    }
331
332    pub fn join_channel(
333        &mut self,
334        channel_id: u64,
335        cx: &mut ModelContext<Self>,
336    ) -> Task<Result<Option<ModelHandle<Room>>>> {
337        if let Some(room) = self.room().cloned() {
338            if room.read(cx).channel_id() == Some(channel_id) {
339                return Task::ready(Ok(Some(room)));
340            } else {
341                room.update(cx, |room, cx| room.clear_state(cx));
342            }
343        }
344
345        if self.pending_room_creation.is_some() {
346            return Task::ready(Ok(None));
347        }
348
349        let client = self.client.clone();
350        let user_store = self.user_store.clone();
351        let join = self._join_debouncer.spawn(cx, move |cx| async move {
352            Room::join_channel(channel_id, client, user_store, cx).await
353        });
354
355        cx.spawn(move |this, mut cx| async move {
356            let room = join.await?;
357            this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
358                .await?;
359            this.update(&mut cx, |this, cx| {
360                this.report_call_event("join channel", cx)
361            });
362            Ok(room)
363        })
364    }
365
366    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
367        cx.notify();
368        self.report_call_event("hang up", cx);
369        Audio::end_call(cx);
370        if let Some((room, _)) = self.room.take() {
371            room.update(cx, |room, cx| room.leave(cx))
372        } else {
373            Task::ready(Ok(()))
374        }
375    }
376
377    pub fn share_project(
378        &mut self,
379        project: ModelHandle<Project>,
380        cx: &mut ModelContext<Self>,
381    ) -> Task<Result<u64>> {
382        if let Some((room, _)) = self.room.as_ref() {
383            self.report_call_event("share project", cx);
384            room.update(cx, |room, cx| room.share_project(project, cx))
385        } else {
386            Task::ready(Err(anyhow!("no active call")))
387        }
388    }
389
390    pub fn unshare_project(
391        &mut self,
392        project: ModelHandle<Project>,
393        cx: &mut ModelContext<Self>,
394    ) -> Result<()> {
395        if let Some((room, _)) = self.room.as_ref() {
396            self.report_call_event("unshare project", cx);
397            room.update(cx, |room, cx| room.unshare_project(project, cx))
398        } else {
399            Err(anyhow!("no active call"))
400        }
401    }
402
403    pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
404        self.location.as_ref()
405    }
406
407    pub fn set_location(
408        &mut self,
409        project: Option<&ModelHandle<Project>>,
410        cx: &mut ModelContext<Self>,
411    ) -> Task<Result<()>> {
412        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
413            self.location = project.map(|project| project.downgrade());
414            if let Some((room, _)) = self.room.as_ref() {
415                return room.update(cx, |room, cx| room.set_location(project, cx));
416            }
417        }
418        Task::ready(Ok(()))
419    }
420
421    fn set_room(
422        &mut self,
423        room: Option<ModelHandle<Room>>,
424        cx: &mut ModelContext<Self>,
425    ) -> Task<Result<()>> {
426        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
427            cx.notify();
428            if let Some(room) = room {
429                if room.read(cx).status().is_offline() {
430                    self.room = None;
431                    Task::ready(Ok(()))
432                } else {
433                    let subscriptions = vec![
434                        cx.observe(&room, |this, room, cx| {
435                            if room.read(cx).status().is_offline() {
436                                this.set_room(None, cx).detach_and_log_err(cx);
437                            }
438
439                            cx.notify();
440                        }),
441                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
442                    ];
443                    self.room = Some((room.clone(), subscriptions));
444                    let location = self.location.and_then(|location| location.upgrade(cx));
445                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
446                }
447            } else {
448                self.room = None;
449                Task::ready(Ok(()))
450            }
451        } else {
452            Task::ready(Ok(()))
453        }
454    }
455
456    pub fn room(&self) -> Option<&ModelHandle<Room>> {
457        self.room.as_ref().map(|(room, _)| room)
458    }
459
460    pub fn client(&self) -> Arc<Client> {
461        self.client.clone()
462    }
463
464    pub fn pending_invites(&self) -> &HashSet<u64> {
465        &self.pending_invites
466    }
467
468    pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
469        if let Some(room) = self.room() {
470            let room = room.read(cx);
471            report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
472        }
473    }
474}
475
476pub fn report_call_event_for_room(
477    operation: &'static str,
478    room_id: u64,
479    channel_id: Option<u64>,
480    client: &Arc<Client>,
481    cx: &AppContext,
482) {
483    let telemetry = client.telemetry();
484    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
485
486    telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
487}
488
489pub fn report_call_event_for_channel(
490    operation: &'static str,
491    channel_id: u64,
492    client: &Arc<Client>,
493    cx: &AppContext,
494) {
495    let room = ActiveCall::global(cx).read(cx).room();
496
497    let telemetry = client.telemetry();
498    let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
499
500    telemetry.report_call_event(
501        telemetry_settings,
502        operation,
503        room.map(|r| r.read(cx).id()),
504        Some(channel_id),
505    )
506}
507
508#[cfg(test)]
509mod test {
510    use gpui::TestAppContext;
511
512    use crate::OneAtATime;
513
514    #[gpui::test]
515    async fn test_one_at_a_time(cx: &mut TestAppContext) {
516        let mut one_at_a_time = OneAtATime { cancel: None };
517
518        assert_eq!(
519            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
520                .await
521                .unwrap(),
522            Some(1)
523        );
524
525        let (a, b) = cx.update(|cx| {
526            (
527                one_at_a_time.spawn(cx, |_| async {
528                    assert!(false);
529                    Ok(2)
530                }),
531                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
532            )
533        });
534
535        assert_eq!(a.await.unwrap(), None);
536        assert_eq!(b.await.unwrap(), Some(3));
537
538        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
539        drop(one_at_a_time);
540
541        assert_eq!(promise.await.unwrap(), None);
542    }
543}