call.rs

  1pub mod participant;
  2pub mod room;
  3
  4use anyhow::{anyhow, Result};
  5use client::{proto, Client, TypedEnvelope, User, UserStore};
  6use gpui::{
  7    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
  8    Subscription, Task, WeakModelHandle,
  9};
 10pub use participant::ParticipantLocation;
 11use postage::watch;
 12use project::Project;
 13pub use room::Room;
 14use std::sync::Arc;
 15
 16pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 17    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 18    cx.set_global(active_call);
 19}
 20
 21#[derive(Clone)]
 22pub struct IncomingCall {
 23    pub room_id: u64,
 24    pub caller: Arc<User>,
 25    pub participants: Vec<Arc<User>>,
 26    pub initial_project: Option<proto::ParticipantProject>,
 27}
 28
 29pub struct ActiveCall {
 30    location: Option<WeakModelHandle<Project>>,
 31    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 32    incoming_call: (
 33        watch::Sender<Option<IncomingCall>>,
 34        watch::Receiver<Option<IncomingCall>>,
 35    ),
 36    client: Arc<Client>,
 37    user_store: ModelHandle<UserStore>,
 38    _subscriptions: Vec<client::Subscription>,
 39}
 40
 41impl Entity for ActiveCall {
 42    type Event = room::Event;
 43}
 44
 45impl ActiveCall {
 46    fn new(
 47        client: Arc<Client>,
 48        user_store: ModelHandle<UserStore>,
 49        cx: &mut ModelContext<Self>,
 50    ) -> Self {
 51        Self {
 52            room: None,
 53            location: None,
 54            incoming_call: watch::channel(),
 55            _subscriptions: vec![
 56                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 57                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 58            ],
 59            client,
 60            user_store,
 61        }
 62    }
 63
 64    async fn handle_incoming_call(
 65        this: ModelHandle<Self>,
 66        envelope: TypedEnvelope<proto::IncomingCall>,
 67        _: Arc<Client>,
 68        mut cx: AsyncAppContext,
 69    ) -> Result<proto::Ack> {
 70        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 71        let call = IncomingCall {
 72            room_id: envelope.payload.room_id,
 73            participants: user_store
 74                .update(&mut cx, |user_store, cx| {
 75                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 76                })
 77                .await?,
 78            caller: user_store
 79                .update(&mut cx, |user_store, cx| {
 80                    user_store.get_user(envelope.payload.caller_user_id, cx)
 81                })
 82                .await?,
 83            initial_project: envelope.payload.initial_project,
 84        };
 85        this.update(&mut cx, |this, _| {
 86            *this.incoming_call.0.borrow_mut() = Some(call);
 87        });
 88
 89        Ok(proto::Ack {})
 90    }
 91
 92    async fn handle_call_canceled(
 93        this: ModelHandle<Self>,
 94        _: TypedEnvelope<proto::CallCanceled>,
 95        _: Arc<Client>,
 96        mut cx: AsyncAppContext,
 97    ) -> Result<()> {
 98        this.update(&mut cx, |this, _| {
 99            *this.incoming_call.0.borrow_mut() = None;
100        });
101        Ok(())
102    }
103
104    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
105        cx.global::<ModelHandle<Self>>().clone()
106    }
107
108    pub fn invite(
109        &mut self,
110        recipient_user_id: u64,
111        initial_project: Option<ModelHandle<Project>>,
112        cx: &mut ModelContext<Self>,
113    ) -> Task<Result<()>> {
114        let client = self.client.clone();
115        let user_store = self.user_store.clone();
116        cx.spawn(|this, mut cx| async move {
117            if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
118                let initial_project_id = if let Some(initial_project) = initial_project {
119                    Some(
120                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
121                            .await?,
122                    )
123                } else {
124                    None
125                };
126
127                room.update(&mut cx, |room, cx| {
128                    room.call(recipient_user_id, initial_project_id, cx)
129                })
130                .await?;
131            } else {
132                let room = cx
133                    .update(|cx| {
134                        Room::create(recipient_user_id, initial_project, client, user_store, cx)
135                    })
136                    .await?;
137
138                this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
139                    .await?;
140            };
141
142            Ok(())
143        })
144    }
145
146    pub fn cancel_invite(
147        &mut self,
148        recipient_user_id: u64,
149        cx: &mut ModelContext<Self>,
150    ) -> Task<Result<()>> {
151        let room_id = if let Some(room) = self.room() {
152            room.read(cx).id()
153        } else {
154            return Task::ready(Err(anyhow!("no active call")));
155        };
156
157        let client = self.client.clone();
158        cx.foreground().spawn(async move {
159            client
160                .request(proto::CancelCall {
161                    room_id,
162                    recipient_user_id,
163                })
164                .await?;
165            anyhow::Ok(())
166        })
167    }
168
169    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
170        self.incoming_call.1.clone()
171    }
172
173    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
174        if self.room.is_some() {
175            return Task::ready(Err(anyhow!("cannot join while on another call")));
176        }
177
178        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
179            call
180        } else {
181            return Task::ready(Err(anyhow!("no incoming call")));
182        };
183
184        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
185        cx.spawn(|this, mut cx| async move {
186            let room = join.await?;
187            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
188                .await?;
189            Ok(())
190        })
191    }
192
193    pub fn decline_incoming(&mut self) -> Result<()> {
194        let call = self
195            .incoming_call
196            .0
197            .borrow_mut()
198            .take()
199            .ok_or_else(|| anyhow!("no incoming call"))?;
200        self.client.send(proto::DeclineCall {
201            room_id: call.room_id,
202        })?;
203        Ok(())
204    }
205
206    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
207        if let Some((room, _)) = self.room.take() {
208            room.update(cx, |room, cx| room.leave(cx))?;
209            cx.notify();
210        }
211        Ok(())
212    }
213
214    pub fn share_project(
215        &mut self,
216        project: ModelHandle<Project>,
217        cx: &mut ModelContext<Self>,
218    ) -> Task<Result<u64>> {
219        if let Some((room, _)) = self.room.as_ref() {
220            room.update(cx, |room, cx| room.share_project(project, cx))
221        } else {
222            Task::ready(Err(anyhow!("no active call")))
223        }
224    }
225
226    pub fn set_location(
227        &mut self,
228        project: Option<&ModelHandle<Project>>,
229        cx: &mut ModelContext<Self>,
230    ) -> Task<Result<()>> {
231        self.location = project.map(|project| project.downgrade());
232        if let Some((room, _)) = self.room.as_ref() {
233            room.update(cx, |room, cx| room.set_location(project, cx))
234        } else {
235            Task::ready(Ok(()))
236        }
237    }
238
239    fn set_room(
240        &mut self,
241        room: Option<ModelHandle<Room>>,
242        cx: &mut ModelContext<Self>,
243    ) -> Task<Result<()>> {
244        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
245            cx.notify();
246            if let Some(room) = room {
247                if room.read(cx).status().is_offline() {
248                    self.room = None;
249                    Task::ready(Ok(()))
250                } else {
251                    let subscriptions = vec![
252                        cx.observe(&room, |this, room, cx| {
253                            if room.read(cx).status().is_offline() {
254                                this.set_room(None, cx).detach_and_log_err(cx);
255                            }
256
257                            cx.notify();
258                        }),
259                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
260                    ];
261                    self.room = Some((room.clone(), subscriptions));
262                    let location = self.location.and_then(|location| location.upgrade(cx));
263                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
264                }
265            } else {
266                self.room = None;
267                Task::ready(Ok(()))
268            }
269        } else {
270            Task::ready(Ok(()))
271        }
272    }
273
274    pub fn room(&self) -> Option<&ModelHandle<Room>> {
275        self.room.as_ref().map(|(room, _)| room)
276    }
277}