call.rs

  1mod indicator;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, TypedEnvelope, User, UserStore};
  7use collections::HashSet;
  8use gpui::{
  9    actions, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
 10    Subscription, Task, ViewHandle, WeakModelHandle,
 11};
 12use indicator::SharingStatusIndicator;
 13pub use participant::ParticipantLocation;
 14use postage::watch;
 15use project::Project;
 16pub use room::Room;
 17use std::sync::Arc;
 18
 19actions!(collab, [ToggleScreenSharing]);
 20
 21pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 22    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 23    cx.set_global(active_call);
 24    cx.add_global_action(toggle_screen_sharing);
 25}
 26
 27#[derive(Clone)]
 28pub struct IncomingCall {
 29    pub room_id: u64,
 30    pub calling_user: Arc<User>,
 31    pub participants: Vec<Arc<User>>,
 32    pub initial_project: Option<proto::ParticipantProject>,
 33}
 34
 35pub struct ActiveCall {
 36    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 37    location: Option<WeakModelHandle<Project>>,
 38    pending_invites: HashSet<u64>,
 39    incoming_call: (
 40        watch::Sender<Option<IncomingCall>>,
 41        watch::Receiver<Option<IncomingCall>>,
 42    ),
 43    client: Arc<Client>,
 44    user_store: ModelHandle<UserStore>,
 45    sharing_status_indicator: Option<(usize, ViewHandle<SharingStatusIndicator>)>,
 46    _subscriptions: Vec<client::Subscription>,
 47}
 48
 49impl Entity for ActiveCall {
 50    type Event = room::Event;
 51}
 52
 53impl ActiveCall {
 54    fn new(
 55        client: Arc<Client>,
 56        user_store: ModelHandle<UserStore>,
 57        cx: &mut ModelContext<Self>,
 58    ) -> Self {
 59        Self {
 60            room: None,
 61            location: None,
 62            pending_invites: Default::default(),
 63            incoming_call: watch::channel(),
 64            _subscriptions: vec![
 65                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 66                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 67            ],
 68            client,
 69            user_store,
 70            sharing_status_indicator: None,
 71        }
 72    }
 73
 74    async fn handle_incoming_call(
 75        this: ModelHandle<Self>,
 76        envelope: TypedEnvelope<proto::IncomingCall>,
 77        _: Arc<Client>,
 78        mut cx: AsyncAppContext,
 79    ) -> Result<proto::Ack> {
 80        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 81        let call = IncomingCall {
 82            room_id: envelope.payload.room_id,
 83            participants: user_store
 84                .update(&mut cx, |user_store, cx| {
 85                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 86                })
 87                .await?,
 88            calling_user: user_store
 89                .update(&mut cx, |user_store, cx| {
 90                    user_store.get_user(envelope.payload.calling_user_id, cx)
 91                })
 92                .await?,
 93            initial_project: envelope.payload.initial_project,
 94        };
 95        this.update(&mut cx, |this, _| {
 96            *this.incoming_call.0.borrow_mut() = Some(call);
 97        });
 98
 99        Ok(proto::Ack {})
100    }
101
102    async fn handle_call_canceled(
103        this: ModelHandle<Self>,
104        envelope: TypedEnvelope<proto::CallCanceled>,
105        _: Arc<Client>,
106        mut cx: AsyncAppContext,
107    ) -> Result<()> {
108        this.update(&mut cx, |this, _| {
109            let mut incoming_call = this.incoming_call.0.borrow_mut();
110            if incoming_call
111                .as_ref()
112                .map_or(false, |call| call.room_id == envelope.payload.room_id)
113            {
114                incoming_call.take();
115            }
116        });
117        Ok(())
118    }
119
120    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
121        cx.global::<ModelHandle<Self>>().clone()
122    }
123
124    pub fn invite(
125        &mut self,
126        called_user_id: u64,
127        initial_project: Option<ModelHandle<Project>>,
128        cx: &mut ModelContext<Self>,
129    ) -> Task<Result<()>> {
130        let client = self.client.clone();
131        let user_store = self.user_store.clone();
132        if !self.pending_invites.insert(called_user_id) {
133            return Task::ready(Err(anyhow!("user was already invited")));
134        }
135
136        cx.notify();
137        cx.spawn(|this, mut cx| async move {
138            let invite = async {
139                if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
140                    let initial_project_id = if let Some(initial_project) = initial_project {
141                        Some(
142                            room.update(&mut cx, |room, cx| {
143                                room.share_project(initial_project, cx)
144                            })
145                            .await?,
146                        )
147                    } else {
148                        None
149                    };
150
151                    room.update(&mut cx, |room, cx| {
152                        room.call(called_user_id, initial_project_id, cx)
153                    })
154                    .await?;
155                } else {
156                    let room = cx
157                        .update(|cx| {
158                            Room::create(called_user_id, initial_project, client, user_store, cx)
159                        })
160                        .await?;
161
162                    this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
163                        .await?;
164                };
165
166                Ok(())
167            };
168
169            let result = invite.await;
170            this.update(&mut cx, |this, cx| {
171                this.pending_invites.remove(&called_user_id);
172                cx.notify();
173            });
174            result
175        })
176    }
177
178    pub fn cancel_invite(
179        &mut self,
180        called_user_id: u64,
181        cx: &mut ModelContext<Self>,
182    ) -> Task<Result<()>> {
183        let room_id = if let Some(room) = self.room() {
184            room.read(cx).id()
185        } else {
186            return Task::ready(Err(anyhow!("no active call")));
187        };
188
189        let client = self.client.clone();
190        cx.foreground().spawn(async move {
191            client
192                .request(proto::CancelCall {
193                    room_id,
194                    called_user_id,
195                })
196                .await?;
197            anyhow::Ok(())
198        })
199    }
200
201    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
202        self.incoming_call.1.clone()
203    }
204
205    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
206        if self.room.is_some() {
207            return Task::ready(Err(anyhow!("cannot join while on another call")));
208        }
209
210        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
211            call
212        } else {
213            return Task::ready(Err(anyhow!("no incoming call")));
214        };
215
216        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
217        cx.spawn(|this, mut cx| async move {
218            let room = join.await?;
219            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
220                .await?;
221            Ok(())
222        })
223    }
224
225    pub fn decline_incoming(&mut self) -> Result<()> {
226        let call = self
227            .incoming_call
228            .0
229            .borrow_mut()
230            .take()
231            .ok_or_else(|| anyhow!("no incoming call"))?;
232        self.client.send(proto::DeclineCall {
233            room_id: call.room_id,
234        })?;
235        Ok(())
236    }
237
238    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
239        if let Some((room, _)) = self.room.take() {
240            room.update(cx, |room, cx| room.leave(cx))?;
241            cx.notify();
242        }
243        Ok(())
244    }
245
246    pub fn share_project(
247        &mut self,
248        project: ModelHandle<Project>,
249        cx: &mut ModelContext<Self>,
250    ) -> Task<Result<u64>> {
251        if let Some((room, _)) = self.room.as_ref() {
252            room.update(cx, |room, cx| room.share_project(project, cx))
253        } else {
254            Task::ready(Err(anyhow!("no active call")))
255        }
256    }
257
258    pub fn set_location(
259        &mut self,
260        project: Option<&ModelHandle<Project>>,
261        cx: &mut ModelContext<Self>,
262    ) -> Task<Result<()>> {
263        self.location = project.map(|project| project.downgrade());
264        if let Some((room, _)) = self.room.as_ref() {
265            room.update(cx, |room, cx| room.set_location(project, cx))
266        } else {
267            Task::ready(Ok(()))
268        }
269    }
270
271    fn set_room(
272        &mut self,
273        room: Option<ModelHandle<Room>>,
274        cx: &mut ModelContext<Self>,
275    ) -> Task<Result<()>> {
276        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
277            cx.notify();
278            if let Some(room) = room {
279                if room.read(cx).status().is_offline() {
280                    self.room = None;
281                    Task::ready(Ok(()))
282                } else {
283                    let subscriptions = vec![
284                        cx.observe(&room, |this, room, cx| {
285                            if room.read(cx).status().is_offline() {
286                                this.set_room(None, cx).detach_and_log_err(cx);
287                            }
288
289                            this.set_sharing_status(room.read(cx).is_screen_sharing(), cx);
290
291                            cx.notify();
292                        }),
293                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
294                    ];
295                    self.room = Some((room.clone(), subscriptions));
296                    let location = self.location.and_then(|location| location.upgrade(cx));
297                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
298                }
299            } else {
300                self.room = None;
301                Task::ready(Ok(()))
302            }
303        } else {
304            Task::ready(Ok(()))
305        }
306    }
307
308    pub fn room(&self) -> Option<&ModelHandle<Room>> {
309        self.room.as_ref().map(|(room, _)| room)
310    }
311
312    pub fn pending_invites(&self) -> &HashSet<u64> {
313        &self.pending_invites
314    }
315
316    pub fn set_sharing_status(&mut self, is_screen_sharing: bool, cx: &mut MutableAppContext) {
317        if is_screen_sharing {
318            if self.sharing_status_indicator.is_none() {
319                self.sharing_status_indicator =
320                    Some(cx.add_status_bar_item(|_| SharingStatusIndicator));
321            }
322        } else if let Some((window_id, _)) = self.sharing_status_indicator.take() {
323            cx.remove_status_bar_item(window_id);
324        }
325    }
326}
327
328pub fn toggle_screen_sharing(_: &ToggleScreenSharing, cx: &mut MutableAppContext) {
329    if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
330        let toggle_screen_sharing = room.update(cx, |room, cx| {
331            if room.is_screen_sharing() {
332                Task::ready(room.unshare_screen(cx))
333            } else {
334                room.share_screen(cx)
335            }
336        });
337        toggle_screen_sharing.detach_and_log_err(cx);
338    }
339}