1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use client::{proto, Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
9use collections::HashSet;
10use futures::{channel::oneshot, future::Shared, Future, FutureExt};
11use gpui::{
12 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
13 WeakModelHandle,
14};
15use postage::watch;
16use project::Project;
17use std::sync::Arc;
18
19pub use participant::ParticipantLocation;
20pub use room::Room;
21
22pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
23 settings::register::<CallSettings>(cx);
24
25 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
26 cx.set_global(active_call);
27}
28
29#[derive(Clone)]
30pub struct IncomingCall {
31 pub room_id: u64,
32 pub calling_user: Arc<User>,
33 pub participants: Vec<Arc<User>>,
34 pub initial_project: Option<proto::ParticipantProject>,
35}
36
37pub struct OneAtATime {
38 cancel: Option<oneshot::Sender<()>>,
39}
40
41impl OneAtATime {
42 /// spawn a task in the given context.
43 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
44 /// otherwise you'll see the result of the task.
45 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
46 where
47 F: 'static + FnOnce(AsyncAppContext) -> Fut,
48 Fut: Future<Output = Result<R>>,
49 R: 'static,
50 {
51 let (tx, rx) = oneshot::channel();
52 self.cancel.replace(tx);
53 cx.spawn(|cx| async move {
54 futures::select_biased! {
55 _ = rx.fuse() => Ok(None),
56 result = f(cx).fuse() => result.map(Some),
57 }
58 })
59 }
60
61 fn running(&self) -> bool {
62 self.cancel
63 .as_ref()
64 .is_some_and(|cancel| !cancel.is_canceled())
65 }
66}
67
68/// Singleton global maintaining the user's participation in a room across workspaces.
69pub struct ActiveCall {
70 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
71 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
72 _join_debouncer: OneAtATime,
73 location: Option<WeakModelHandle<Project>>,
74 pending_invites: HashSet<u64>,
75 incoming_call: (
76 watch::Sender<Option<IncomingCall>>,
77 watch::Receiver<Option<IncomingCall>>,
78 ),
79 client: Arc<Client>,
80 user_store: ModelHandle<UserStore>,
81 _subscriptions: Vec<client::Subscription>,
82}
83
84impl Entity for ActiveCall {
85 type Event = room::Event;
86}
87
88impl ActiveCall {
89 fn new(
90 client: Arc<Client>,
91 user_store: ModelHandle<UserStore>,
92 cx: &mut ModelContext<Self>,
93 ) -> Self {
94 Self {
95 room: None,
96 pending_room_creation: None,
97 location: None,
98 pending_invites: Default::default(),
99 incoming_call: watch::channel(),
100
101 _join_debouncer: OneAtATime { cancel: None },
102 _subscriptions: vec![
103 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
104 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
105 ],
106 client,
107 user_store,
108 }
109 }
110
111 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
112 self.room()?.read(cx).channel_id()
113 }
114
115 async fn handle_incoming_call(
116 this: ModelHandle<Self>,
117 envelope: TypedEnvelope<proto::IncomingCall>,
118 _: Arc<Client>,
119 mut cx: AsyncAppContext,
120 ) -> Result<proto::Ack> {
121 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
122 let call = IncomingCall {
123 room_id: envelope.payload.room_id,
124 participants: user_store
125 .update(&mut cx, |user_store, cx| {
126 user_store.get_users(envelope.payload.participant_user_ids, cx)
127 })
128 .await?,
129 calling_user: user_store
130 .update(&mut cx, |user_store, cx| {
131 user_store.get_user(envelope.payload.calling_user_id, cx)
132 })
133 .await?,
134 initial_project: envelope.payload.initial_project,
135 };
136 this.update(&mut cx, |this, _| {
137 *this.incoming_call.0.borrow_mut() = Some(call);
138 });
139
140 Ok(proto::Ack {})
141 }
142
143 async fn handle_call_canceled(
144 this: ModelHandle<Self>,
145 envelope: TypedEnvelope<proto::CallCanceled>,
146 _: Arc<Client>,
147 mut cx: AsyncAppContext,
148 ) -> Result<()> {
149 this.update(&mut cx, |this, _| {
150 let mut incoming_call = this.incoming_call.0.borrow_mut();
151 if incoming_call
152 .as_ref()
153 .map_or(false, |call| call.room_id == envelope.payload.room_id)
154 {
155 incoming_call.take();
156 }
157 });
158 Ok(())
159 }
160
161 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
162 cx.global::<ModelHandle<Self>>().clone()
163 }
164
165 pub fn invite(
166 &mut self,
167 called_user_id: u64,
168 initial_project: Option<ModelHandle<Project>>,
169 cx: &mut ModelContext<Self>,
170 ) -> Task<Result<()>> {
171 if !self.pending_invites.insert(called_user_id) {
172 return Task::ready(Err(anyhow!("user was already invited")));
173 }
174 cx.notify();
175
176 if self._join_debouncer.running() {
177 return Task::ready(Ok(()));
178 }
179
180 let room = if let Some(room) = self.room().cloned() {
181 Some(Task::ready(Ok(room)).shared())
182 } else {
183 self.pending_room_creation.clone()
184 };
185
186 let invite = if let Some(room) = room {
187 cx.spawn_weak(|_, mut cx| async move {
188 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
189
190 let initial_project_id = if let Some(initial_project) = initial_project {
191 Some(
192 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
193 .await?,
194 )
195 } else {
196 None
197 };
198
199 room.update(&mut cx, |room, cx| {
200 room.call(called_user_id, initial_project_id, cx)
201 })
202 .await?;
203
204 anyhow::Ok(())
205 })
206 } else {
207 let client = self.client.clone();
208 let user_store = self.user_store.clone();
209 let room = cx
210 .spawn(|this, mut cx| async move {
211 let create_room = async {
212 let room = cx
213 .update(|cx| {
214 Room::create(
215 called_user_id,
216 initial_project,
217 client,
218 user_store,
219 cx,
220 )
221 })
222 .await?;
223
224 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
225 .await?;
226
227 anyhow::Ok(room)
228 };
229
230 let room = create_room.await;
231 this.update(&mut cx, |this, _| this.pending_room_creation = None);
232 room.map_err(Arc::new)
233 })
234 .shared();
235 self.pending_room_creation = Some(room.clone());
236 cx.foreground().spawn(async move {
237 room.await.map_err(|err| anyhow!("{:?}", err))?;
238 anyhow::Ok(())
239 })
240 };
241
242 cx.spawn(|this, mut cx| async move {
243 let result = invite.await;
244 if result.is_ok() {
245 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
246 } else {
247 // TODO: Resport collaboration error
248 }
249
250 this.update(&mut cx, |this, cx| {
251 this.pending_invites.remove(&called_user_id);
252 cx.notify();
253 });
254 result
255 })
256 }
257
258 pub fn cancel_invite(
259 &mut self,
260 called_user_id: u64,
261 cx: &mut ModelContext<Self>,
262 ) -> Task<Result<()>> {
263 let room_id = if let Some(room) = self.room() {
264 room.read(cx).id()
265 } else {
266 return Task::ready(Err(anyhow!("no active call")));
267 };
268
269 let client = self.client.clone();
270 cx.foreground().spawn(async move {
271 client
272 .request(proto::CancelCall {
273 room_id,
274 called_user_id,
275 })
276 .await?;
277 anyhow::Ok(())
278 })
279 }
280
281 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
282 self.incoming_call.1.clone()
283 }
284
285 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286 if self.room.is_some() {
287 return Task::ready(Err(anyhow!("cannot join while on another call")));
288 }
289
290 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
291 call
292 } else {
293 return Task::ready(Err(anyhow!("no incoming call")));
294 };
295
296 if self.pending_room_creation.is_some() {
297 return Task::ready(Ok(()));
298 }
299
300 let room_id = call.room_id.clone();
301 let client = self.client.clone();
302 let user_store = self.user_store.clone();
303 let join = self
304 ._join_debouncer
305 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
306
307 cx.spawn(|this, mut cx| async move {
308 let room = join.await?;
309 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
310 .await?;
311 this.update(&mut cx, |this, cx| {
312 this.report_call_event("accept incoming", cx)
313 });
314 Ok(())
315 })
316 }
317
318 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
319 let call = self
320 .incoming_call
321 .0
322 .borrow_mut()
323 .take()
324 .ok_or_else(|| anyhow!("no incoming call"))?;
325 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
326 self.client.send(proto::DeclineCall {
327 room_id: call.room_id,
328 })?;
329 Ok(())
330 }
331
332 pub fn join_channel(
333 &mut self,
334 channel_id: u64,
335 cx: &mut ModelContext<Self>,
336 ) -> Task<Result<Option<ModelHandle<Room>>>> {
337 if let Some(room) = self.room().cloned() {
338 if room.read(cx).channel_id() == Some(channel_id) {
339 return Task::ready(Ok(Some(room)));
340 } else {
341 room.update(cx, |room, cx| room.clear_state(cx));
342 }
343 }
344
345 if self.pending_room_creation.is_some() {
346 return Task::ready(Ok(None));
347 }
348
349 let client = self.client.clone();
350 let user_store = self.user_store.clone();
351 let join = self._join_debouncer.spawn(cx, move |cx| async move {
352 Room::join_channel(channel_id, client, user_store, cx).await
353 });
354
355 cx.spawn(move |this, mut cx| async move {
356 let room = join.await?;
357 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
358 .await?;
359 this.update(&mut cx, |this, cx| {
360 this.report_call_event("join channel", cx)
361 });
362 Ok(room)
363 })
364 }
365
366 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
367 cx.notify();
368 self.report_call_event("hang up", cx);
369 Audio::end_call(cx);
370 if let Some((room, _)) = self.room.take() {
371 room.update(cx, |room, cx| room.leave(cx))
372 } else {
373 Task::ready(Ok(()))
374 }
375 }
376
377 pub fn share_project(
378 &mut self,
379 project: ModelHandle<Project>,
380 cx: &mut ModelContext<Self>,
381 ) -> Task<Result<u64>> {
382 if let Some((room, _)) = self.room.as_ref() {
383 self.report_call_event("share project", cx);
384 room.update(cx, |room, cx| room.share_project(project, cx))
385 } else {
386 Task::ready(Err(anyhow!("no active call")))
387 }
388 }
389
390 pub fn unshare_project(
391 &mut self,
392 project: ModelHandle<Project>,
393 cx: &mut ModelContext<Self>,
394 ) -> Result<()> {
395 if let Some((room, _)) = self.room.as_ref() {
396 self.report_call_event("unshare project", cx);
397 room.update(cx, |room, cx| room.unshare_project(project, cx))
398 } else {
399 Err(anyhow!("no active call"))
400 }
401 }
402
403 pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
404 self.location.as_ref()
405 }
406
407 pub fn set_location(
408 &mut self,
409 project: Option<&ModelHandle<Project>>,
410 cx: &mut ModelContext<Self>,
411 ) -> Task<Result<()>> {
412 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
413 self.location = project.map(|project| project.downgrade());
414 if let Some((room, _)) = self.room.as_ref() {
415 return room.update(cx, |room, cx| room.set_location(project, cx));
416 }
417 }
418 Task::ready(Ok(()))
419 }
420
421 fn set_room(
422 &mut self,
423 room: Option<ModelHandle<Room>>,
424 cx: &mut ModelContext<Self>,
425 ) -> Task<Result<()>> {
426 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
427 cx.notify();
428 if let Some(room) = room {
429 if room.read(cx).status().is_offline() {
430 self.room = None;
431 Task::ready(Ok(()))
432 } else {
433 let subscriptions = vec![
434 cx.observe(&room, |this, room, cx| {
435 if room.read(cx).status().is_offline() {
436 this.set_room(None, cx).detach_and_log_err(cx);
437 }
438
439 cx.notify();
440 }),
441 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
442 ];
443 self.room = Some((room.clone(), subscriptions));
444 let location = self.location.and_then(|location| location.upgrade(cx));
445 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
446 }
447 } else {
448 self.room = None;
449 Task::ready(Ok(()))
450 }
451 } else {
452 Task::ready(Ok(()))
453 }
454 }
455
456 pub fn room(&self) -> Option<&ModelHandle<Room>> {
457 self.room.as_ref().map(|(room, _)| room)
458 }
459
460 pub fn client(&self) -> Arc<Client> {
461 self.client.clone()
462 }
463
464 pub fn pending_invites(&self) -> &HashSet<u64> {
465 &self.pending_invites
466 }
467
468 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
469 if let Some(room) = self.room() {
470 let room = room.read(cx);
471 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
472 }
473 }
474}
475
476pub fn report_call_event_for_room(
477 operation: &'static str,
478 room_id: u64,
479 channel_id: Option<u64>,
480 client: &Arc<Client>,
481 cx: &AppContext,
482) {
483 let telemetry = client.telemetry();
484 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
485
486 telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
487}
488
489pub fn report_call_event_for_channel(
490 operation: &'static str,
491 channel_id: u64,
492 client: &Arc<Client>,
493 cx: &AppContext,
494) {
495 let room = ActiveCall::global(cx).read(cx).room();
496
497 let telemetry = client.telemetry();
498 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
499
500 telemetry.report_call_event(
501 telemetry_settings,
502 operation,
503 room.map(|r| r.read(cx).id()),
504 Some(channel_id),
505 )
506}
507
508#[cfg(test)]
509mod test {
510 use gpui::TestAppContext;
511
512 use crate::OneAtATime;
513
514 #[gpui::test]
515 async fn test_one_at_a_time(cx: &mut TestAppContext) {
516 let mut one_at_a_time = OneAtATime { cancel: None };
517
518 assert_eq!(
519 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
520 .await
521 .unwrap(),
522 Some(1)
523 );
524
525 let (a, b) = cx.update(|cx| {
526 (
527 one_at_a_time.spawn(cx, |_| async {
528 assert!(false);
529 Ok(2)
530 }),
531 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
532 )
533 });
534
535 assert_eq!(a.await.unwrap(), None);
536 assert_eq!(b.await.unwrap(), Some(3));
537
538 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
539 drop(one_at_a_time);
540
541 assert_eq!(promise.await.unwrap(), None);
542 }
543}