call.rs

  1mod participant;
  2pub mod room;
  3
  4use anyhow::{anyhow, Result};
  5use client::{proto, Client, TypedEnvelope, User, UserStore};
  6use gpui::{
  7    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
  8    Subscription, Task,
  9};
 10pub use participant::ParticipantLocation;
 11use postage::watch;
 12use project::Project;
 13pub use room::Room;
 14use std::sync::Arc;
 15
 16pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 17    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 18    cx.set_global(active_call);
 19}
 20
 21#[derive(Clone)]
 22pub struct IncomingCall {
 23    pub room_id: u64,
 24    pub caller: Arc<User>,
 25    pub participants: Vec<Arc<User>>,
 26    pub initial_project: Option<proto::ParticipantProject>,
 27}
 28
 29pub struct ActiveCall {
 30    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 31    incoming_call: (
 32        watch::Sender<Option<IncomingCall>>,
 33        watch::Receiver<Option<IncomingCall>>,
 34    ),
 35    client: Arc<Client>,
 36    user_store: ModelHandle<UserStore>,
 37    _subscriptions: Vec<client::Subscription>,
 38}
 39
 40impl Entity for ActiveCall {
 41    type Event = room::Event;
 42}
 43
 44impl ActiveCall {
 45    fn new(
 46        client: Arc<Client>,
 47        user_store: ModelHandle<UserStore>,
 48        cx: &mut ModelContext<Self>,
 49    ) -> Self {
 50        Self {
 51            room: None,
 52            incoming_call: watch::channel(),
 53            _subscriptions: vec![
 54                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 55                client.add_message_handler(cx.handle(), Self::handle_call_canceled),
 56            ],
 57            client,
 58            user_store,
 59        }
 60    }
 61
 62    async fn handle_incoming_call(
 63        this: ModelHandle<Self>,
 64        envelope: TypedEnvelope<proto::IncomingCall>,
 65        _: Arc<Client>,
 66        mut cx: AsyncAppContext,
 67    ) -> Result<proto::Ack> {
 68        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 69        let call = IncomingCall {
 70            room_id: envelope.payload.room_id,
 71            participants: user_store
 72                .update(&mut cx, |user_store, cx| {
 73                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 74                })
 75                .await?,
 76            caller: user_store
 77                .update(&mut cx, |user_store, cx| {
 78                    user_store.get_user(envelope.payload.caller_user_id, cx)
 79                })
 80                .await?,
 81            initial_project: envelope.payload.initial_project,
 82        };
 83        this.update(&mut cx, |this, _| {
 84            *this.incoming_call.0.borrow_mut() = Some(call);
 85        });
 86
 87        Ok(proto::Ack {})
 88    }
 89
 90    async fn handle_call_canceled(
 91        this: ModelHandle<Self>,
 92        _: TypedEnvelope<proto::CallCanceled>,
 93        _: Arc<Client>,
 94        mut cx: AsyncAppContext,
 95    ) -> Result<()> {
 96        this.update(&mut cx, |this, _| {
 97            *this.incoming_call.0.borrow_mut() = None;
 98        });
 99        Ok(())
100    }
101
102    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
103        cx.global::<ModelHandle<Self>>().clone()
104    }
105
106    pub fn invite(
107        &mut self,
108        recipient_user_id: u64,
109        initial_project: Option<ModelHandle<Project>>,
110        cx: &mut ModelContext<Self>,
111    ) -> Task<Result<()>> {
112        let client = self.client.clone();
113        let user_store = self.user_store.clone();
114        cx.spawn(|this, mut cx| async move {
115            if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
116                let initial_project_id = if let Some(initial_project) = initial_project {
117                    Some(
118                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
119                            .await?,
120                    )
121                } else {
122                    None
123                };
124
125                room.update(&mut cx, |room, cx| {
126                    room.call(recipient_user_id, initial_project_id, cx)
127                })
128                .await?;
129            } else {
130                let room = cx
131                    .update(|cx| {
132                        Room::create(recipient_user_id, initial_project, client, user_store, cx)
133                    })
134                    .await?;
135                this.update(&mut cx, |this, cx| this.set_room(Some(room), cx));
136            };
137
138            Ok(())
139        })
140    }
141
142    pub fn cancel_invite(
143        &mut self,
144        recipient_user_id: u64,
145        cx: &mut ModelContext<Self>,
146    ) -> Task<Result<()>> {
147        let room_id = if let Some(room) = self.room() {
148            room.read(cx).id()
149        } else {
150            return Task::ready(Err(anyhow!("no active call")));
151        };
152
153        let client = self.client.clone();
154        cx.foreground().spawn(async move {
155            client
156                .request(proto::CancelCall {
157                    room_id,
158                    recipient_user_id,
159                })
160                .await?;
161            anyhow::Ok(())
162        })
163    }
164
165    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
166        self.incoming_call.1.clone()
167    }
168
169    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
170        if self.room.is_some() {
171            return Task::ready(Err(anyhow!("cannot join while on another call")));
172        }
173
174        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
175            call
176        } else {
177            return Task::ready(Err(anyhow!("no incoming call")));
178        };
179
180        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
181        cx.spawn(|this, mut cx| async move {
182            let room = join.await?;
183            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx));
184            Ok(())
185        })
186    }
187
188    pub fn decline_incoming(&mut self) -> Result<()> {
189        let call = self
190            .incoming_call
191            .0
192            .borrow_mut()
193            .take()
194            .ok_or_else(|| anyhow!("no incoming call"))?;
195        self.client.send(proto::DeclineCall {
196            room_id: call.room_id,
197        })?;
198        Ok(())
199    }
200
201    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
202        if let Some((room, _)) = self.room.take() {
203            room.update(cx, |room, cx| room.leave(cx))?;
204            cx.notify();
205        }
206        Ok(())
207    }
208
209    pub fn share_project(
210        &mut self,
211        project: ModelHandle<Project>,
212        cx: &mut ModelContext<Self>,
213    ) -> Task<Result<u64>> {
214        if let Some((room, _)) = self.room.as_ref() {
215            room.update(cx, |room, cx| room.share_project(project, cx))
216        } else {
217            Task::ready(Err(anyhow!("no active call")))
218        }
219    }
220
221    pub fn set_location(
222        &mut self,
223        project: Option<&ModelHandle<Project>>,
224        cx: &mut ModelContext<Self>,
225    ) -> Task<Result<()>> {
226        if let Some((room, _)) = self.room.as_ref() {
227            room.update(cx, |room, cx| room.set_location(project, cx))
228        } else {
229            Task::ready(Err(anyhow!("no active call")))
230        }
231    }
232
233    fn set_room(&mut self, room: Option<ModelHandle<Room>>, cx: &mut ModelContext<Self>) {
234        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
235            if let Some(room) = room {
236                if room.read(cx).status().is_offline() {
237                    self.room = None;
238                } else {
239                    let subscriptions = vec![
240                        cx.observe(&room, |this, room, cx| {
241                            if room.read(cx).status().is_offline() {
242                                this.set_room(None, cx);
243                            }
244
245                            cx.notify();
246                        }),
247                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
248                    ];
249                    self.room = Some((room, subscriptions));
250                }
251            } else {
252                self.room = None;
253            }
254            cx.notify();
255        }
256    }
257
258    pub fn room(&self) -> Option<&ModelHandle<Room>> {
259        self.room.as_ref().map(|(room, _)| room)
260    }
261}