1pub mod participant;
2pub mod room;
3
4use std::sync::Arc;
5
6use anyhow::{anyhow, Result};
7use client::{proto, Client, TypedEnvelope, User, UserStore};
8use collections::HashSet;
9use futures::{future::Shared, FutureExt};
10use postage::watch;
11
12use gpui::{
13 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
14 WeakModelHandle,
15};
16use project::Project;
17
18pub use participant::ParticipantLocation;
19pub use room::Room;
20
21pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
22 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
23 cx.set_global(active_call);
24}
25
26#[derive(Clone)]
27pub struct IncomingCall {
28 pub room_id: u64,
29 pub calling_user: Arc<User>,
30 pub participants: Vec<Arc<User>>,
31 pub initial_project: Option<proto::ParticipantProject>,
32}
33
34/// Singleton global maintaining the user's participation in a room across workspaces.
35pub struct ActiveCall {
36 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
37 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
38 location: Option<WeakModelHandle<Project>>,
39 pending_invites: HashSet<u64>,
40 incoming_call: (
41 watch::Sender<Option<IncomingCall>>,
42 watch::Receiver<Option<IncomingCall>>,
43 ),
44 client: Arc<Client>,
45 user_store: ModelHandle<UserStore>,
46 _subscriptions: Vec<client::Subscription>,
47}
48
49impl Entity for ActiveCall {
50 type Event = room::Event;
51}
52
53impl ActiveCall {
54 fn new(
55 client: Arc<Client>,
56 user_store: ModelHandle<UserStore>,
57 cx: &mut ModelContext<Self>,
58 ) -> Self {
59 Self {
60 room: None,
61 pending_room_creation: None,
62 location: None,
63 pending_invites: Default::default(),
64 incoming_call: watch::channel(),
65 _subscriptions: vec![
66 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
67 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
68 ],
69 client,
70 user_store,
71 }
72 }
73
74 async fn handle_incoming_call(
75 this: ModelHandle<Self>,
76 envelope: TypedEnvelope<proto::IncomingCall>,
77 _: Arc<Client>,
78 mut cx: AsyncAppContext,
79 ) -> Result<proto::Ack> {
80 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
81 let call = IncomingCall {
82 room_id: envelope.payload.room_id,
83 participants: user_store
84 .update(&mut cx, |user_store, cx| {
85 user_store.get_users(envelope.payload.participant_user_ids, cx)
86 })
87 .await?,
88 calling_user: user_store
89 .update(&mut cx, |user_store, cx| {
90 user_store.get_user(envelope.payload.calling_user_id, cx)
91 })
92 .await?,
93 initial_project: envelope.payload.initial_project,
94 };
95 this.update(&mut cx, |this, _| {
96 *this.incoming_call.0.borrow_mut() = Some(call);
97 });
98
99 Ok(proto::Ack {})
100 }
101
102 async fn handle_call_canceled(
103 this: ModelHandle<Self>,
104 envelope: TypedEnvelope<proto::CallCanceled>,
105 _: Arc<Client>,
106 mut cx: AsyncAppContext,
107 ) -> Result<()> {
108 this.update(&mut cx, |this, _| {
109 let mut incoming_call = this.incoming_call.0.borrow_mut();
110 if incoming_call
111 .as_ref()
112 .map_or(false, |call| call.room_id == envelope.payload.room_id)
113 {
114 incoming_call.take();
115 }
116 });
117 Ok(())
118 }
119
120 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
121 cx.global::<ModelHandle<Self>>().clone()
122 }
123
124 pub fn invite(
125 &mut self,
126 called_user_id: u64,
127 initial_project: Option<ModelHandle<Project>>,
128 cx: &mut ModelContext<Self>,
129 ) -> Task<Result<()>> {
130 if !self.pending_invites.insert(called_user_id) {
131 return Task::ready(Err(anyhow!("user was already invited")));
132 }
133 cx.notify();
134
135 let room = if let Some(room) = self.room().cloned() {
136 Some(Task::ready(Ok(room)).shared())
137 } else {
138 self.pending_room_creation.clone()
139 };
140
141 let invite = if let Some(room) = room {
142 cx.spawn_weak(|_, mut cx| async move {
143 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
144
145 let initial_project_id = if let Some(initial_project) = initial_project {
146 Some(
147 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
148 .await?,
149 )
150 } else {
151 None
152 };
153
154 room.update(&mut cx, |room, cx| {
155 room.call(called_user_id, initial_project_id, cx)
156 })
157 .await?;
158
159 anyhow::Ok(())
160 })
161 } else {
162 let client = self.client.clone();
163 let user_store = self.user_store.clone();
164 let room = cx
165 .spawn(|this, mut cx| async move {
166 let create_room = async {
167 let room = cx
168 .update(|cx| {
169 Room::create(
170 called_user_id,
171 initial_project,
172 client,
173 user_store,
174 cx,
175 )
176 })
177 .await?;
178
179 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
180 .await?;
181
182 anyhow::Ok(room)
183 };
184
185 let room = create_room.await;
186 this.update(&mut cx, |this, _| this.pending_room_creation = None);
187 room.map_err(Arc::new)
188 })
189 .shared();
190 self.pending_room_creation = Some(room.clone());
191 cx.foreground().spawn(async move {
192 room.await.map_err(|err| anyhow!("{:?}", err))?;
193 anyhow::Ok(())
194 })
195 };
196
197 cx.spawn(|this, mut cx| async move {
198 let result = invite.await;
199 this.update(&mut cx, |this, cx| {
200 this.pending_invites.remove(&called_user_id);
201 cx.notify();
202 });
203 result
204 })
205 }
206
207 pub fn cancel_invite(
208 &mut self,
209 called_user_id: u64,
210 cx: &mut ModelContext<Self>,
211 ) -> Task<Result<()>> {
212 let room_id = if let Some(room) = self.room() {
213 room.read(cx).id()
214 } else {
215 return Task::ready(Err(anyhow!("no active call")));
216 };
217
218 let client = self.client.clone();
219 cx.foreground().spawn(async move {
220 client
221 .request(proto::CancelCall {
222 room_id,
223 called_user_id,
224 })
225 .await?;
226 anyhow::Ok(())
227 })
228 }
229
230 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
231 self.incoming_call.1.clone()
232 }
233
234 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
235 if self.room.is_some() {
236 return Task::ready(Err(anyhow!("cannot join while on another call")));
237 }
238
239 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
240 call
241 } else {
242 return Task::ready(Err(anyhow!("no incoming call")));
243 };
244
245 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
246 cx.spawn(|this, mut cx| async move {
247 let room = join.await?;
248 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
249 .await?;
250 Ok(())
251 })
252 }
253
254 pub fn decline_incoming(&mut self) -> Result<()> {
255 let call = self
256 .incoming_call
257 .0
258 .borrow_mut()
259 .take()
260 .ok_or_else(|| anyhow!("no incoming call"))?;
261 self.client.send(proto::DeclineCall {
262 room_id: call.room_id,
263 })?;
264 Ok(())
265 }
266
267 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
268 cx.notify();
269 if let Some((room, _)) = self.room.take() {
270 room.update(cx, |room, cx| room.leave(cx))
271 } else {
272 Task::ready(Ok(()))
273 }
274 }
275
276 pub fn share_project(
277 &mut self,
278 project: ModelHandle<Project>,
279 cx: &mut ModelContext<Self>,
280 ) -> Task<Result<u64>> {
281 if let Some((room, _)) = self.room.as_ref() {
282 room.update(cx, |room, cx| room.share_project(project, cx))
283 } else {
284 Task::ready(Err(anyhow!("no active call")))
285 }
286 }
287
288 pub fn unshare_project(
289 &mut self,
290 project: ModelHandle<Project>,
291 cx: &mut ModelContext<Self>,
292 ) -> Result<()> {
293 if let Some((room, _)) = self.room.as_ref() {
294 room.update(cx, |room, cx| room.unshare_project(project, cx))
295 } else {
296 Err(anyhow!("no active call"))
297 }
298 }
299
300 pub fn set_location(
301 &mut self,
302 project: Option<&ModelHandle<Project>>,
303 cx: &mut ModelContext<Self>,
304 ) -> Task<Result<()>> {
305 self.location = project.map(|project| project.downgrade());
306 if let Some((room, _)) = self.room.as_ref() {
307 room.update(cx, |room, cx| room.set_location(project, cx))
308 } else {
309 Task::ready(Ok(()))
310 }
311 }
312
313 fn set_room(
314 &mut self,
315 room: Option<ModelHandle<Room>>,
316 cx: &mut ModelContext<Self>,
317 ) -> Task<Result<()>> {
318 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
319 cx.notify();
320 if let Some(room) = room {
321 if room.read(cx).status().is_offline() {
322 self.room = None;
323 Task::ready(Ok(()))
324 } else {
325 let subscriptions = vec![
326 cx.observe(&room, |this, room, cx| {
327 if room.read(cx).status().is_offline() {
328 this.set_room(None, cx).detach_and_log_err(cx);
329 }
330
331 cx.notify();
332 }),
333 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
334 ];
335 self.room = Some((room.clone(), subscriptions));
336 let location = self.location.and_then(|location| location.upgrade(cx));
337 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
338 }
339 } else {
340 self.room = None;
341 Task::ready(Ok(()))
342 }
343 } else {
344 Task::ready(Ok(()))
345 }
346 }
347
348 pub fn room(&self) -> Option<&ModelHandle<Room>> {
349 self.room.as_ref().map(|(room, _)| room)
350 }
351
352 pub fn pending_invites(&self) -> &HashSet<u64> {
353 &self.pending_invites
354 }
355}