call.rs

  1pub mod participant;
  2pub mod room;
  3
  4use std::sync::Arc;
  5
  6use anyhow::{anyhow, Result};
  7use client::{proto, Client, TypedEnvelope, User, UserStore};
  8use collections::HashSet;
  9use futures::{future::Shared, FutureExt};
 10use postage::watch;
 11
 12use gpui::{
 13    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
 14    WeakModelHandle,
 15};
 16use project::Project;
 17
 18pub use participant::ParticipantLocation;
 19pub use room::Room;
 20
 21pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
 22    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 23    cx.set_global(active_call);
 24}
 25
 26#[derive(Clone)]
 27pub struct IncomingCall {
 28    pub room_id: u64,
 29    pub calling_user: Arc<User>,
 30    pub participants: Vec<Arc<User>>,
 31    pub initial_project: Option<proto::ParticipantProject>,
 32}
 33
 34/// Singleton global maintaining the user's participation in a room across workspaces.
 35pub struct ActiveCall {
 36    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 37    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
 38    location: Option<WeakModelHandle<Project>>,
 39    pending_invites: HashSet<u64>,
 40    incoming_call: (
 41        watch::Sender<Option<IncomingCall>>,
 42        watch::Receiver<Option<IncomingCall>>,
 43    ),
 44    client: Arc<Client>,
 45    user_store: ModelHandle<UserStore>,
 46    _subscriptions: Vec<client::Subscription>,
 47}
 48
 49impl Entity for ActiveCall {
 50    type Event = room::Event;
 51}
 52
 53impl ActiveCall {
 54    fn new(
 55        client: Arc<Client>,
 56        user_store: ModelHandle<UserStore>,
 57        cx: &mut ModelContext<Self>,
 58    ) -> Self {
 59        Self {
 60            room: None,
 61            pending_room_creation: None,
 62            location: None,
 63            pending_invites: Default::default(),
 64            incoming_call: watch::channel(),
 65            _subscriptions: vec![
 66                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 67                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 68            ],
 69            client,
 70            user_store,
 71        }
 72    }
 73
 74    async fn handle_incoming_call(
 75        this: ModelHandle<Self>,
 76        envelope: TypedEnvelope<proto::IncomingCall>,
 77        _: Arc<Client>,
 78        mut cx: AsyncAppContext,
 79    ) -> Result<proto::Ack> {
 80        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 81        let call = IncomingCall {
 82            room_id: envelope.payload.room_id,
 83            participants: user_store
 84                .update(&mut cx, |user_store, cx| {
 85                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 86                })
 87                .await?,
 88            calling_user: user_store
 89                .update(&mut cx, |user_store, cx| {
 90                    user_store.get_user(envelope.payload.calling_user_id, cx)
 91                })
 92                .await?,
 93            initial_project: envelope.payload.initial_project,
 94        };
 95        this.update(&mut cx, |this, _| {
 96            *this.incoming_call.0.borrow_mut() = Some(call);
 97        });
 98
 99        Ok(proto::Ack {})
100    }
101
102    async fn handle_call_canceled(
103        this: ModelHandle<Self>,
104        envelope: TypedEnvelope<proto::CallCanceled>,
105        _: Arc<Client>,
106        mut cx: AsyncAppContext,
107    ) -> Result<()> {
108        this.update(&mut cx, |this, _| {
109            let mut incoming_call = this.incoming_call.0.borrow_mut();
110            if incoming_call
111                .as_ref()
112                .map_or(false, |call| call.room_id == envelope.payload.room_id)
113            {
114                incoming_call.take();
115            }
116        });
117        Ok(())
118    }
119
120    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
121        cx.global::<ModelHandle<Self>>().clone()
122    }
123
124    pub fn invite(
125        &mut self,
126        called_user_id: u64,
127        initial_project: Option<ModelHandle<Project>>,
128        cx: &mut ModelContext<Self>,
129    ) -> Task<Result<()>> {
130        if !self.pending_invites.insert(called_user_id) {
131            return Task::ready(Err(anyhow!("user was already invited")));
132        }
133        cx.notify();
134
135        let room = if let Some(room) = self.room().cloned() {
136            Some(Task::ready(Ok(room)).shared())
137        } else {
138            self.pending_room_creation.clone()
139        };
140
141        let invite = if let Some(room) = room {
142            cx.spawn_weak(|_, mut cx| async move {
143                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
144
145                let initial_project_id = if let Some(initial_project) = initial_project {
146                    Some(
147                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
148                            .await?,
149                    )
150                } else {
151                    None
152                };
153
154                room.update(&mut cx, |room, cx| {
155                    room.call(called_user_id, initial_project_id, cx)
156                })
157                .await?;
158
159                anyhow::Ok(())
160            })
161        } else {
162            let client = self.client.clone();
163            let user_store = self.user_store.clone();
164            let room = cx
165                .spawn(|this, mut cx| async move {
166                    let create_room = async {
167                        let room = cx
168                            .update(|cx| {
169                                Room::create(
170                                    called_user_id,
171                                    initial_project,
172                                    client,
173                                    user_store,
174                                    cx,
175                                )
176                            })
177                            .await?;
178
179                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
180                            .await?;
181
182                        anyhow::Ok(room)
183                    };
184
185                    let room = create_room.await;
186                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
187                    room.map_err(Arc::new)
188                })
189                .shared();
190            self.pending_room_creation = Some(room.clone());
191            cx.foreground().spawn(async move {
192                room.await.map_err(|err| anyhow!("{:?}", err))?;
193                anyhow::Ok(())
194            })
195        };
196
197        cx.spawn(|this, mut cx| async move {
198            let result = invite.await;
199            this.update(&mut cx, |this, cx| {
200                this.pending_invites.remove(&called_user_id);
201                cx.notify();
202            });
203            result
204        })
205    }
206
207    pub fn cancel_invite(
208        &mut self,
209        called_user_id: u64,
210        cx: &mut ModelContext<Self>,
211    ) -> Task<Result<()>> {
212        let room_id = if let Some(room) = self.room() {
213            room.read(cx).id()
214        } else {
215            return Task::ready(Err(anyhow!("no active call")));
216        };
217
218        let client = self.client.clone();
219        cx.foreground().spawn(async move {
220            client
221                .request(proto::CancelCall {
222                    room_id,
223                    called_user_id,
224                })
225                .await?;
226            anyhow::Ok(())
227        })
228    }
229
230    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
231        self.incoming_call.1.clone()
232    }
233
234    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
235        if self.room.is_some() {
236            return Task::ready(Err(anyhow!("cannot join while on another call")));
237        }
238
239        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
240            call
241        } else {
242            return Task::ready(Err(anyhow!("no incoming call")));
243        };
244
245        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
246        cx.spawn(|this, mut cx| async move {
247            let room = join.await?;
248            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
249                .await?;
250            Ok(())
251        })
252    }
253
254    pub fn decline_incoming(&mut self) -> Result<()> {
255        let call = self
256            .incoming_call
257            .0
258            .borrow_mut()
259            .take()
260            .ok_or_else(|| anyhow!("no incoming call"))?;
261        self.client.send(proto::DeclineCall {
262            room_id: call.room_id,
263        })?;
264        Ok(())
265    }
266
267    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
268        cx.notify();
269        if let Some((room, _)) = self.room.take() {
270            room.update(cx, |room, cx| room.leave(cx))
271        } else {
272            Task::ready(Ok(()))
273        }
274    }
275
276    pub fn share_project(
277        &mut self,
278        project: ModelHandle<Project>,
279        cx: &mut ModelContext<Self>,
280    ) -> Task<Result<u64>> {
281        if let Some((room, _)) = self.room.as_ref() {
282            room.update(cx, |room, cx| room.share_project(project, cx))
283        } else {
284            Task::ready(Err(anyhow!("no active call")))
285        }
286    }
287
288    pub fn unshare_project(
289        &mut self,
290        project: ModelHandle<Project>,
291        cx: &mut ModelContext<Self>,
292    ) -> Result<()> {
293        if let Some((room, _)) = self.room.as_ref() {
294            room.update(cx, |room, cx| room.unshare_project(project, cx))
295        } else {
296            Err(anyhow!("no active call"))
297        }
298    }
299
300    pub fn set_location(
301        &mut self,
302        project: Option<&ModelHandle<Project>>,
303        cx: &mut ModelContext<Self>,
304    ) -> Task<Result<()>> {
305        self.location = project.map(|project| project.downgrade());
306        if let Some((room, _)) = self.room.as_ref() {
307            room.update(cx, |room, cx| room.set_location(project, cx))
308        } else {
309            Task::ready(Ok(()))
310        }
311    }
312
313    fn set_room(
314        &mut self,
315        room: Option<ModelHandle<Room>>,
316        cx: &mut ModelContext<Self>,
317    ) -> Task<Result<()>> {
318        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
319            cx.notify();
320            if let Some(room) = room {
321                if room.read(cx).status().is_offline() {
322                    self.room = None;
323                    Task::ready(Ok(()))
324                } else {
325                    let subscriptions = vec![
326                        cx.observe(&room, |this, room, cx| {
327                            if room.read(cx).status().is_offline() {
328                                this.set_room(None, cx).detach_and_log_err(cx);
329                            }
330
331                            cx.notify();
332                        }),
333                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
334                    ];
335                    self.room = Some((room.clone(), subscriptions));
336                    let location = self.location.and_then(|location| location.upgrade(cx));
337                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
338                }
339            } else {
340                self.room = None;
341                Task::ready(Ok(()))
342            }
343        } else {
344            Task::ready(Ok(()))
345        }
346    }
347
348    pub fn room(&self) -> Option<&ModelHandle<Room>> {
349        self.room.as_ref().map(|(room, _)| room)
350    }
351
352    pub fn pending_invites(&self) -> &HashSet<u64> {
353        &self.pending_invites
354    }
355}