call.rs

  1pub mod participant;
  2pub mod room;
  3
  4use anyhow::{anyhow, Result};
  5use client::{proto, Client, TypedEnvelope, User, UserStore};
  6use collections::HashSet;
  7use gpui::{
  8    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
  9    Subscription, Task, WeakModelHandle,
 10};
 11pub use participant::ParticipantLocation;
 12use postage::watch;
 13use project::Project;
 14pub use room::Room;
 15use std::sync::Arc;
 16
 17pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 18    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 19    cx.set_global(active_call);
 20}
 21
 22#[derive(Clone)]
 23pub struct IncomingCall {
 24    pub room_id: u64,
 25    pub calling_user: Arc<User>,
 26    pub participants: Vec<Arc<User>>,
 27    pub initial_project: Option<proto::ParticipantProject>,
 28}
 29
 30pub struct ActiveCall {
 31    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 32    location: Option<WeakModelHandle<Project>>,
 33    pending_invites: HashSet<u64>,
 34    incoming_call: (
 35        watch::Sender<Option<IncomingCall>>,
 36        watch::Receiver<Option<IncomingCall>>,
 37    ),
 38    client: Arc<Client>,
 39    user_store: ModelHandle<UserStore>,
 40    _subscriptions: Vec<client::Subscription>,
 41}
 42
 43impl Entity for ActiveCall {
 44    type Event = room::Event;
 45}
 46
 47impl ActiveCall {
 48    fn new(
 49        client: Arc<Client>,
 50        user_store: ModelHandle<UserStore>,
 51        cx: &mut ModelContext<Self>,
 52    ) -> Self {
 53        Self {
 54            room: None,
 55            location: None,
 56            pending_invites: Default::default(),
 57            incoming_call: watch::channel(),
 58            _subscriptions: vec![
 59                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 60                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 61            ],
 62            client,
 63            user_store,
 64        }
 65    }
 66
 67    async fn handle_incoming_call(
 68        this: ModelHandle<Self>,
 69        envelope: TypedEnvelope<proto::IncomingCall>,
 70        _: Arc<Client>,
 71        mut cx: AsyncAppContext,
 72    ) -> Result<proto::Ack> {
 73        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 74        let call = IncomingCall {
 75            room_id: envelope.payload.room_id,
 76            participants: user_store
 77                .update(&mut cx, |user_store, cx| {
 78                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 79                })
 80                .await?,
 81            calling_user: user_store
 82                .update(&mut cx, |user_store, cx| {
 83                    user_store.get_user(envelope.payload.calling_user_id, cx)
 84                })
 85                .await?,
 86            initial_project: envelope.payload.initial_project,
 87        };
 88        this.update(&mut cx, |this, _| {
 89            *this.incoming_call.0.borrow_mut() = Some(call);
 90        });
 91
 92        Ok(proto::Ack {})
 93    }
 94
 95    async fn handle_call_canceled(
 96        this: ModelHandle<Self>,
 97        _: TypedEnvelope<proto::CallCanceled>,
 98        _: Arc<Client>,
 99        mut cx: AsyncAppContext,
100    ) -> Result<()> {
101        this.update(&mut cx, |this, _| {
102            *this.incoming_call.0.borrow_mut() = None;
103        });
104        Ok(())
105    }
106
107    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
108        cx.global::<ModelHandle<Self>>().clone()
109    }
110
111    pub fn invite(
112        &mut self,
113        called_user_id: u64,
114        initial_project: Option<ModelHandle<Project>>,
115        cx: &mut ModelContext<Self>,
116    ) -> Task<Result<()>> {
117        let client = self.client.clone();
118        let user_store = self.user_store.clone();
119        if !self.pending_invites.insert(called_user_id) {
120            return Task::ready(Err(anyhow!("user was already invited")));
121        }
122
123        cx.notify();
124        cx.spawn(|this, mut cx| async move {
125            let invite = async {
126                if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
127                    let initial_project_id = if let Some(initial_project) = initial_project {
128                        Some(
129                            room.update(&mut cx, |room, cx| {
130                                room.share_project(initial_project, cx)
131                            })
132                            .await?,
133                        )
134                    } else {
135                        None
136                    };
137
138                    room.update(&mut cx, |room, cx| {
139                        room.call(called_user_id, initial_project_id, cx)
140                    })
141                    .await?;
142                } else {
143                    let room = cx
144                        .update(|cx| {
145                            Room::create(called_user_id, initial_project, client, user_store, cx)
146                        })
147                        .await?;
148
149                    this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
150                        .await?;
151                };
152
153                Ok(())
154            };
155
156            let result = invite.await;
157            this.update(&mut cx, |this, cx| {
158                this.pending_invites.remove(&called_user_id);
159                cx.notify();
160            });
161            result
162        })
163    }
164
165    pub fn cancel_invite(
166        &mut self,
167        called_user_id: u64,
168        cx: &mut ModelContext<Self>,
169    ) -> Task<Result<()>> {
170        let room_id = if let Some(room) = self.room() {
171            room.read(cx).id()
172        } else {
173            return Task::ready(Err(anyhow!("no active call")));
174        };
175
176        let client = self.client.clone();
177        cx.foreground().spawn(async move {
178            client
179                .request(proto::CancelCall {
180                    room_id,
181                    called_user_id,
182                })
183                .await?;
184            anyhow::Ok(())
185        })
186    }
187
188    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
189        self.incoming_call.1.clone()
190    }
191
192    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
193        if self.room.is_some() {
194            return Task::ready(Err(anyhow!("cannot join while on another call")));
195        }
196
197        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
198            call
199        } else {
200            return Task::ready(Err(anyhow!("no incoming call")));
201        };
202
203        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
204        cx.spawn(|this, mut cx| async move {
205            let room = join.await?;
206            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
207                .await?;
208            Ok(())
209        })
210    }
211
212    pub fn decline_incoming(&mut self) -> Result<()> {
213        let call = self
214            .incoming_call
215            .0
216            .borrow_mut()
217            .take()
218            .ok_or_else(|| anyhow!("no incoming call"))?;
219        self.client.send(proto::DeclineCall {
220            room_id: call.room_id,
221        })?;
222        Ok(())
223    }
224
225    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
226        if let Some((room, _)) = self.room.take() {
227            room.update(cx, |room, cx| room.leave(cx))?;
228            cx.notify();
229        }
230        Ok(())
231    }
232
233    pub fn share_project(
234        &mut self,
235        project: ModelHandle<Project>,
236        cx: &mut ModelContext<Self>,
237    ) -> Task<Result<u64>> {
238        if let Some((room, _)) = self.room.as_ref() {
239            room.update(cx, |room, cx| room.share_project(project, cx))
240        } else {
241            Task::ready(Err(anyhow!("no active call")))
242        }
243    }
244
245    pub fn set_location(
246        &mut self,
247        project: Option<&ModelHandle<Project>>,
248        cx: &mut ModelContext<Self>,
249    ) -> Task<Result<()>> {
250        self.location = project.map(|project| project.downgrade());
251        if let Some((room, _)) = self.room.as_ref() {
252            room.update(cx, |room, cx| room.set_location(project, cx))
253        } else {
254            Task::ready(Ok(()))
255        }
256    }
257
258    fn set_room(
259        &mut self,
260        room: Option<ModelHandle<Room>>,
261        cx: &mut ModelContext<Self>,
262    ) -> Task<Result<()>> {
263        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
264            cx.notify();
265            if let Some(room) = room {
266                if room.read(cx).status().is_offline() {
267                    self.room = None;
268                    Task::ready(Ok(()))
269                } else {
270                    let subscriptions = vec![
271                        cx.observe(&room, |this, room, cx| {
272                            if room.read(cx).status().is_offline() {
273                                this.set_room(None, cx).detach_and_log_err(cx);
274                            }
275
276                            cx.notify();
277                        }),
278                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
279                    ];
280                    self.room = Some((room.clone(), subscriptions));
281                    let location = self.location.and_then(|location| location.upgrade(cx));
282                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
283                }
284            } else {
285                self.room = None;
286                Task::ready(Ok(()))
287            }
288        } else {
289            Task::ready(Ok(()))
290        }
291    }
292
293    pub fn room(&self) -> Option<&ModelHandle<Room>> {
294        self.room.as_ref().map(|(room, _)| room)
295    }
296
297    pub fn pending_invites(&self) -> &HashSet<u64> {
298        &self.pending_invites
299    }
300}