call.rs

  1mod indicator;
  2pub mod participant;
  3pub mod room;
  4
  5use std::sync::Arc;
  6
  7use anyhow::{anyhow, Result};
  8use client::{proto, Client, TypedEnvelope, User, UserStore};
  9use collections::HashSet;
 10use postage::watch;
 11
 12use gpui::{
 13    actions, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
 14    Subscription, Task, ViewHandle, WeakModelHandle,
 15};
 16use project::Project;
 17use settings::Settings;
 18
 19use indicator::SharingStatusIndicator;
 20pub use participant::ParticipantLocation;
 21pub use room::Room;
 22
 23actions!(collab, [ToggleScreenSharing]);
 24
 25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 26    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 27    cx.set_global(active_call);
 28    cx.add_global_action(toggle_screen_sharing);
 29}
 30
 31#[derive(Clone)]
 32pub struct IncomingCall {
 33    pub room_id: u64,
 34    pub calling_user: Arc<User>,
 35    pub participants: Vec<Arc<User>>,
 36    pub initial_project: Option<proto::ParticipantProject>,
 37}
 38
 39pub struct ActiveCall {
 40    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 41    location: Option<WeakModelHandle<Project>>,
 42    pending_invites: HashSet<u64>,
 43    incoming_call: (
 44        watch::Sender<Option<IncomingCall>>,
 45        watch::Receiver<Option<IncomingCall>>,
 46    ),
 47    client: Arc<Client>,
 48    user_store: ModelHandle<UserStore>,
 49    sharing_status_indicator: Option<(usize, ViewHandle<SharingStatusIndicator>)>,
 50    _subscriptions: Vec<client::Subscription>,
 51}
 52
 53impl Entity for ActiveCall {
 54    type Event = room::Event;
 55}
 56
 57impl ActiveCall {
 58    fn new(
 59        client: Arc<Client>,
 60        user_store: ModelHandle<UserStore>,
 61        cx: &mut ModelContext<Self>,
 62    ) -> Self {
 63        Self {
 64            room: None,
 65            location: None,
 66            pending_invites: Default::default(),
 67            incoming_call: watch::channel(),
 68            _subscriptions: vec![
 69                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 70                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 71            ],
 72            client,
 73            user_store,
 74            sharing_status_indicator: None,
 75        }
 76    }
 77
 78    async fn handle_incoming_call(
 79        this: ModelHandle<Self>,
 80        envelope: TypedEnvelope<proto::IncomingCall>,
 81        _: Arc<Client>,
 82        mut cx: AsyncAppContext,
 83    ) -> Result<proto::Ack> {
 84        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 85        let call = IncomingCall {
 86            room_id: envelope.payload.room_id,
 87            participants: user_store
 88                .update(&mut cx, |user_store, cx| {
 89                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 90                })
 91                .await?,
 92            calling_user: user_store
 93                .update(&mut cx, |user_store, cx| {
 94                    user_store.get_user(envelope.payload.calling_user_id, cx)
 95                })
 96                .await?,
 97            initial_project: envelope.payload.initial_project,
 98        };
 99        this.update(&mut cx, |this, _| {
100            *this.incoming_call.0.borrow_mut() = Some(call);
101        });
102
103        Ok(proto::Ack {})
104    }
105
106    async fn handle_call_canceled(
107        this: ModelHandle<Self>,
108        envelope: TypedEnvelope<proto::CallCanceled>,
109        _: Arc<Client>,
110        mut cx: AsyncAppContext,
111    ) -> Result<()> {
112        this.update(&mut cx, |this, _| {
113            let mut incoming_call = this.incoming_call.0.borrow_mut();
114            if incoming_call
115                .as_ref()
116                .map_or(false, |call| call.room_id == envelope.payload.room_id)
117            {
118                incoming_call.take();
119            }
120        });
121        Ok(())
122    }
123
124    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
125        cx.global::<ModelHandle<Self>>().clone()
126    }
127
128    pub fn invite(
129        &mut self,
130        called_user_id: u64,
131        initial_project: Option<ModelHandle<Project>>,
132        cx: &mut ModelContext<Self>,
133    ) -> Task<Result<()>> {
134        let client = self.client.clone();
135        let user_store = self.user_store.clone();
136        if !self.pending_invites.insert(called_user_id) {
137            return Task::ready(Err(anyhow!("user was already invited")));
138        }
139
140        cx.notify();
141        cx.spawn(|this, mut cx| async move {
142            let invite = async {
143                if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
144                    let initial_project_id = if let Some(initial_project) = initial_project {
145                        Some(
146                            room.update(&mut cx, |room, cx| {
147                                room.share_project(initial_project, cx)
148                            })
149                            .await?,
150                        )
151                    } else {
152                        None
153                    };
154
155                    room.update(&mut cx, |room, cx| {
156                        room.call(called_user_id, initial_project_id, cx)
157                    })
158                    .await?;
159                } else {
160                    let room = cx
161                        .update(|cx| {
162                            Room::create(called_user_id, initial_project, client, user_store, cx)
163                        })
164                        .await?;
165
166                    this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
167                        .await?;
168                };
169
170                Ok(())
171            };
172
173            let result = invite.await;
174            this.update(&mut cx, |this, cx| {
175                this.pending_invites.remove(&called_user_id);
176                cx.notify();
177            });
178            result
179        })
180    }
181
182    pub fn cancel_invite(
183        &mut self,
184        called_user_id: u64,
185        cx: &mut ModelContext<Self>,
186    ) -> Task<Result<()>> {
187        let room_id = if let Some(room) = self.room() {
188            room.read(cx).id()
189        } else {
190            return Task::ready(Err(anyhow!("no active call")));
191        };
192
193        let client = self.client.clone();
194        cx.foreground().spawn(async move {
195            client
196                .request(proto::CancelCall {
197                    room_id,
198                    called_user_id,
199                })
200                .await?;
201            anyhow::Ok(())
202        })
203    }
204
205    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
206        self.incoming_call.1.clone()
207    }
208
209    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
210        if self.room.is_some() {
211            return Task::ready(Err(anyhow!("cannot join while on another call")));
212        }
213
214        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
215            call
216        } else {
217            return Task::ready(Err(anyhow!("no incoming call")));
218        };
219
220        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
221        cx.spawn(|this, mut cx| async move {
222            let room = join.await?;
223            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
224                .await?;
225            Ok(())
226        })
227    }
228
229    pub fn decline_incoming(&mut self) -> Result<()> {
230        let call = self
231            .incoming_call
232            .0
233            .borrow_mut()
234            .take()
235            .ok_or_else(|| anyhow!("no incoming call"))?;
236        self.client.send(proto::DeclineCall {
237            room_id: call.room_id,
238        })?;
239        Ok(())
240    }
241
242    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
243        if let Some((room, _)) = self.room.take() {
244            room.update(cx, |room, cx| room.leave(cx))?;
245            cx.notify();
246        }
247        Ok(())
248    }
249
250    pub fn share_project(
251        &mut self,
252        project: ModelHandle<Project>,
253        cx: &mut ModelContext<Self>,
254    ) -> Task<Result<u64>> {
255        if let Some((room, _)) = self.room.as_ref() {
256            room.update(cx, |room, cx| room.share_project(project, cx))
257        } else {
258            Task::ready(Err(anyhow!("no active call")))
259        }
260    }
261
262    pub fn set_location(
263        &mut self,
264        project: Option<&ModelHandle<Project>>,
265        cx: &mut ModelContext<Self>,
266    ) -> Task<Result<()>> {
267        self.location = project.map(|project| project.downgrade());
268        if let Some((room, _)) = self.room.as_ref() {
269            room.update(cx, |room, cx| room.set_location(project, cx))
270        } else {
271            Task::ready(Ok(()))
272        }
273    }
274
275    fn set_room(
276        &mut self,
277        room: Option<ModelHandle<Room>>,
278        cx: &mut ModelContext<Self>,
279    ) -> Task<Result<()>> {
280        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
281            cx.notify();
282            if let Some(room) = room {
283                if room.read(cx).status().is_offline() {
284                    self.room = None;
285                    Task::ready(Ok(()))
286                } else {
287                    let subscriptions = vec![
288                        cx.observe(&room, |this, room, cx| {
289                            if room.read(cx).status().is_offline() {
290                                this.set_room(None, cx).detach_and_log_err(cx);
291                            }
292
293                            this.set_sharing_status(room.read(cx).is_screen_sharing(), cx);
294
295                            cx.notify();
296                        }),
297                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
298                    ];
299                    self.room = Some((room.clone(), subscriptions));
300                    let location = self.location.and_then(|location| location.upgrade(cx));
301                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
302                }
303            } else {
304                self.room = None;
305                Task::ready(Ok(()))
306            }
307        } else {
308            Task::ready(Ok(()))
309        }
310    }
311
312    pub fn room(&self) -> Option<&ModelHandle<Room>> {
313        self.room.as_ref().map(|(room, _)| room)
314    }
315
316    pub fn pending_invites(&self) -> &HashSet<u64> {
317        &self.pending_invites
318    }
319
320    pub fn set_sharing_status(&mut self, is_screen_sharing: bool, cx: &mut MutableAppContext) {
321        if is_screen_sharing {
322            if self.sharing_status_indicator.is_none()
323                && cx.global::<Settings>().show_call_status_icon
324            {
325                self.sharing_status_indicator =
326                    Some(cx.add_status_bar_item(|_| SharingStatusIndicator));
327            }
328        } else if let Some((window_id, _)) = self.sharing_status_indicator.take() {
329            cx.remove_status_bar_item(window_id);
330        }
331    }
332}
333
334pub fn toggle_screen_sharing(_: &ToggleScreenSharing, cx: &mut MutableAppContext) {
335    if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
336        let toggle_screen_sharing = room.update(cx, |room, cx| {
337            if room.is_screen_sharing() {
338                Task::ready(room.unshare_screen(cx))
339            } else {
340                room.share_screen(cx)
341            }
342        });
343        toggle_screen_sharing.detach_and_log_err(cx);
344    }
345}