call.rs

  1pub mod participant;
  2pub mod room;
  3
  4use anyhow::{anyhow, Result};
  5use client::{proto, Client, TypedEnvelope, User, UserStore};
  6use collections::HashSet;
  7use gpui::{
  8    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
  9    Subscription, Task, WeakModelHandle,
 10};
 11pub use participant::ParticipantLocation;
 12use postage::watch;
 13use project::Project;
 14pub use room::Room;
 15use std::sync::Arc;
 16
 17pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 18    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 19    cx.set_global(active_call);
 20}
 21
 22#[derive(Clone)]
 23pub struct IncomingCall {
 24    pub room_id: u64,
 25    pub calling_user: Arc<User>,
 26    pub participants: Vec<Arc<User>>,
 27    pub initial_project: Option<proto::ParticipantProject>,
 28}
 29
 30pub struct ActiveCall {
 31    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 32    location: Option<WeakModelHandle<Project>>,
 33    pending_invites: HashSet<u64>,
 34    incoming_call: (
 35        watch::Sender<Option<IncomingCall>>,
 36        watch::Receiver<Option<IncomingCall>>,
 37    ),
 38    client: Arc<Client>,
 39    user_store: ModelHandle<UserStore>,
 40    _subscriptions: Vec<client::Subscription>,
 41}
 42
 43impl Entity for ActiveCall {
 44    type Event = room::Event;
 45}
 46
 47impl ActiveCall {
 48    fn new(
 49        client: Arc<Client>,
 50        user_store: ModelHandle<UserStore>,
 51        cx: &mut ModelContext<Self>,
 52    ) -> Self {
 53        Self {
 54            room: None,
 55            location: None,
 56            pending_invites: Default::default(),
 57            incoming_call: watch::channel(),
 58            _subscriptions: vec![
 59                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 60                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 61            ],
 62            client,
 63            user_store,
 64        }
 65    }
 66
 67    async fn handle_incoming_call(
 68        this: ModelHandle<Self>,
 69        envelope: TypedEnvelope<proto::IncomingCall>,
 70        _: Arc<Client>,
 71        mut cx: AsyncAppContext,
 72    ) -> Result<proto::Ack> {
 73        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 74        let call = IncomingCall {
 75            room_id: envelope.payload.room_id,
 76            participants: user_store
 77                .update(&mut cx, |user_store, cx| {
 78                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 79                })
 80                .await?,
 81            calling_user: user_store
 82                .update(&mut cx, |user_store, cx| {
 83                    user_store.get_user(envelope.payload.calling_user_id, cx)
 84                })
 85                .await?,
 86            initial_project: envelope.payload.initial_project,
 87        };
 88        this.update(&mut cx, |this, _| {
 89            *this.incoming_call.0.borrow_mut() = Some(call);
 90        });
 91
 92        Ok(proto::Ack {})
 93    }
 94
 95    async fn handle_call_canceled(
 96        this: ModelHandle<Self>,
 97        envelope: TypedEnvelope<proto::CallCanceled>,
 98        _: Arc<Client>,
 99        mut cx: AsyncAppContext,
100    ) -> Result<()> {
101        this.update(&mut cx, |this, _| {
102            let mut incoming_call = this.incoming_call.0.borrow_mut();
103            if incoming_call
104                .as_ref()
105                .map_or(false, |call| call.room_id == envelope.payload.room_id)
106            {
107                incoming_call.take();
108            }
109        });
110        Ok(())
111    }
112
113    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
114        cx.global::<ModelHandle<Self>>().clone()
115    }
116
117    pub fn invite(
118        &mut self,
119        called_user_id: u64,
120        initial_project: Option<ModelHandle<Project>>,
121        cx: &mut ModelContext<Self>,
122    ) -> Task<Result<()>> {
123        let client = self.client.clone();
124        let user_store = self.user_store.clone();
125        if !self.pending_invites.insert(called_user_id) {
126            return Task::ready(Err(anyhow!("user was already invited")));
127        }
128
129        cx.notify();
130        cx.spawn(|this, mut cx| async move {
131            let invite = async {
132                if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
133                    let initial_project_id = if let Some(initial_project) = initial_project {
134                        Some(
135                            room.update(&mut cx, |room, cx| {
136                                room.share_project(initial_project, cx)
137                            })
138                            .await?,
139                        )
140                    } else {
141                        None
142                    };
143
144                    room.update(&mut cx, |room, cx| {
145                        room.call(called_user_id, initial_project_id, cx)
146                    })
147                    .await?;
148                } else {
149                    let room = cx
150                        .update(|cx| {
151                            Room::create(called_user_id, initial_project, client, user_store, cx)
152                        })
153                        .await?;
154
155                    this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
156                        .await?;
157                };
158
159                Ok(())
160            };
161
162            let result = invite.await;
163            this.update(&mut cx, |this, cx| {
164                this.pending_invites.remove(&called_user_id);
165                cx.notify();
166            });
167            result
168        })
169    }
170
171    pub fn cancel_invite(
172        &mut self,
173        called_user_id: u64,
174        cx: &mut ModelContext<Self>,
175    ) -> Task<Result<()>> {
176        let room_id = if let Some(room) = self.room() {
177            room.read(cx).id()
178        } else {
179            return Task::ready(Err(anyhow!("no active call")));
180        };
181
182        let client = self.client.clone();
183        cx.foreground().spawn(async move {
184            client
185                .request(proto::CancelCall {
186                    room_id,
187                    called_user_id,
188                })
189                .await?;
190            anyhow::Ok(())
191        })
192    }
193
194    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
195        self.incoming_call.1.clone()
196    }
197
198    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
199        if self.room.is_some() {
200            return Task::ready(Err(anyhow!("cannot join while on another call")));
201        }
202
203        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
204            call
205        } else {
206            return Task::ready(Err(anyhow!("no incoming call")));
207        };
208
209        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
210        cx.spawn(|this, mut cx| async move {
211            let room = join.await?;
212            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
213                .await?;
214            Ok(())
215        })
216    }
217
218    pub fn decline_incoming(&mut self) -> Result<()> {
219        let call = self
220            .incoming_call
221            .0
222            .borrow_mut()
223            .take()
224            .ok_or_else(|| anyhow!("no incoming call"))?;
225        self.client.send(proto::DeclineCall {
226            room_id: call.room_id,
227        })?;
228        Ok(())
229    }
230
231    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
232        if let Some((room, _)) = self.room.take() {
233            room.update(cx, |room, cx| room.leave(cx))?;
234            cx.notify();
235        }
236        Ok(())
237    }
238
239    pub fn share_project(
240        &mut self,
241        project: ModelHandle<Project>,
242        cx: &mut ModelContext<Self>,
243    ) -> Task<Result<u64>> {
244        if let Some((room, _)) = self.room.as_ref() {
245            room.update(cx, |room, cx| room.share_project(project, cx))
246        } else {
247            Task::ready(Err(anyhow!("no active call")))
248        }
249    }
250
251    pub fn set_location(
252        &mut self,
253        project: Option<&ModelHandle<Project>>,
254        cx: &mut ModelContext<Self>,
255    ) -> Task<Result<()>> {
256        self.location = project.map(|project| project.downgrade());
257        if let Some((room, _)) = self.room.as_ref() {
258            room.update(cx, |room, cx| room.set_location(project, cx))
259        } else {
260            Task::ready(Ok(()))
261        }
262    }
263
264    fn set_room(
265        &mut self,
266        room: Option<ModelHandle<Room>>,
267        cx: &mut ModelContext<Self>,
268    ) -> Task<Result<()>> {
269        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
270            cx.notify();
271            if let Some(room) = room {
272                if room.read(cx).status().is_offline() {
273                    self.room = None;
274                    Task::ready(Ok(()))
275                } else {
276                    let subscriptions = vec![
277                        cx.observe(&room, |this, room, cx| {
278                            if room.read(cx).status().is_offline() {
279                                this.set_room(None, cx).detach_and_log_err(cx);
280                            }
281
282                            cx.notify();
283                        }),
284                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
285                    ];
286                    self.room = Some((room.clone(), subscriptions));
287                    let location = self.location.and_then(|location| location.upgrade(cx));
288                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
289                }
290            } else {
291                self.room = None;
292                Task::ready(Ok(()))
293            }
294        } else {
295            Task::ready(Ok(()))
296        }
297    }
298
299    pub fn room(&self) -> Option<&ModelHandle<Room>> {
300        self.room.as_ref().map(|(room, _)| room)
301    }
302
303    pub fn pending_invites(&self) -> &HashSet<u64> {
304        &self.pending_invites
305    }
306}