1pub mod call_settings;
2pub mod participant;
3pub mod room;
4mod shared_screen;
5
6use anyhow::{anyhow, bail, Result};
7use async_trait::async_trait;
8use audio::Audio;
9use call_settings::CallSettings;
10use client::{
11 proto::{self, PeerId},
12 Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
13};
14use collections::HashSet;
15use futures::{channel::oneshot, future::Shared, Future, FutureExt};
16use gpui::{
17 AppContext, AsyncAppContext, AsyncWindowContext, Context, EventEmitter, Model, ModelContext,
18 Subscription, Task, View, ViewContext, VisualContext, WeakModel, WeakView,
19};
20pub use participant::ParticipantLocation;
21use participant::RemoteParticipant;
22use postage::watch;
23use project::Project;
24use room::Event;
25pub use room::Room;
26use settings::Settings;
27use shared_screen::SharedScreen;
28use std::sync::Arc;
29use util::ResultExt;
30use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
31
32pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
33 CallSettings::register(cx);
34
35 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
36 cx.set_global(active_call);
37}
38
39pub struct OneAtATime {
40 cancel: Option<oneshot::Sender<()>>,
41}
42
43impl OneAtATime {
44 /// spawn a task in the given context.
45 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
46 /// otherwise you'll see the result of the task.
47 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
48 where
49 F: 'static + FnOnce(AsyncAppContext) -> Fut,
50 Fut: Future<Output = Result<R>>,
51 R: 'static,
52 {
53 let (tx, rx) = oneshot::channel();
54 self.cancel.replace(tx);
55 cx.spawn(|cx| async move {
56 futures::select_biased! {
57 _ = rx.fuse() => Ok(None),
58 result = f(cx).fuse() => result.map(Some),
59 }
60 })
61 }
62
63 fn running(&self) -> bool {
64 self.cancel
65 .as_ref()
66 .is_some_and(|cancel| !cancel.is_canceled())
67 }
68}
69
70#[derive(Clone)]
71pub struct IncomingCall {
72 pub room_id: u64,
73 pub calling_user: Arc<User>,
74 pub participants: Vec<Arc<User>>,
75 pub initial_project: Option<proto::ParticipantProject>,
76}
77
78/// Singleton global maintaining the user's participation in a room across workspaces.
79pub struct ActiveCall {
80 room: Option<(Model<Room>, Vec<Subscription>)>,
81 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
82 location: Option<WeakModel<Project>>,
83 _join_debouncer: OneAtATime,
84 pending_invites: HashSet<u64>,
85 incoming_call: (
86 watch::Sender<Option<IncomingCall>>,
87 watch::Receiver<Option<IncomingCall>>,
88 ),
89 client: Arc<Client>,
90 user_store: Model<UserStore>,
91 _subscriptions: Vec<client::Subscription>,
92}
93
94impl EventEmitter<Event> for ActiveCall {}
95
96impl ActiveCall {
97 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
98 Self {
99 room: None,
100 pending_room_creation: None,
101 location: None,
102 pending_invites: Default::default(),
103 incoming_call: watch::channel(),
104 _join_debouncer: OneAtATime { cancel: None },
105 _subscriptions: vec![
106 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
107 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
108 ],
109 client,
110 user_store,
111 }
112 }
113
114 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
115 self.room()?.read(cx).channel_id()
116 }
117
118 async fn handle_incoming_call(
119 this: Model<Self>,
120 envelope: TypedEnvelope<proto::IncomingCall>,
121 _: Arc<Client>,
122 mut cx: AsyncAppContext,
123 ) -> Result<proto::Ack> {
124 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
125 let call = IncomingCall {
126 room_id: envelope.payload.room_id,
127 participants: user_store
128 .update(&mut cx, |user_store, cx| {
129 user_store.get_users(envelope.payload.participant_user_ids, cx)
130 })?
131 .await?,
132 calling_user: user_store
133 .update(&mut cx, |user_store, cx| {
134 user_store.get_user(envelope.payload.calling_user_id, cx)
135 })?
136 .await?,
137 initial_project: envelope.payload.initial_project,
138 };
139 this.update(&mut cx, |this, _| {
140 *this.incoming_call.0.borrow_mut() = Some(call);
141 })?;
142
143 Ok(proto::Ack {})
144 }
145
146 async fn handle_call_canceled(
147 this: Model<Self>,
148 envelope: TypedEnvelope<proto::CallCanceled>,
149 _: Arc<Client>,
150 mut cx: AsyncAppContext,
151 ) -> Result<()> {
152 this.update(&mut cx, |this, _| {
153 let mut incoming_call = this.incoming_call.0.borrow_mut();
154 if incoming_call
155 .as_ref()
156 .map_or(false, |call| call.room_id == envelope.payload.room_id)
157 {
158 incoming_call.take();
159 }
160 })?;
161 Ok(())
162 }
163
164 pub fn global(cx: &AppContext) -> Model<Self> {
165 cx.global::<Model<Self>>().clone()
166 }
167
168 pub fn invite(
169 &mut self,
170 called_user_id: u64,
171 initial_project: Option<Model<Project>>,
172 cx: &mut ModelContext<Self>,
173 ) -> Task<Result<()>> {
174 if !self.pending_invites.insert(called_user_id) {
175 return Task::ready(Err(anyhow!("user was already invited")));
176 }
177 cx.notify();
178
179 if self._join_debouncer.running() {
180 return Task::ready(Ok(()));
181 }
182
183 let room = if let Some(room) = self.room().cloned() {
184 Some(Task::ready(Ok(room)).shared())
185 } else {
186 self.pending_room_creation.clone()
187 };
188
189 let invite = if let Some(room) = room {
190 cx.spawn(move |_, mut cx| async move {
191 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
192
193 let initial_project_id = if let Some(initial_project) = initial_project {
194 Some(
195 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
196 .await?,
197 )
198 } else {
199 None
200 };
201
202 room.update(&mut cx, move |room, cx| {
203 room.call(called_user_id, initial_project_id, cx)
204 })?
205 .await?;
206
207 anyhow::Ok(())
208 })
209 } else {
210 let client = self.client.clone();
211 let user_store = self.user_store.clone();
212 let room = cx
213 .spawn(move |this, mut cx| async move {
214 let create_room = async {
215 let room = cx
216 .update(|cx| {
217 Room::create(
218 called_user_id,
219 initial_project,
220 client,
221 user_store,
222 cx,
223 )
224 })?
225 .await?;
226
227 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
228 .await?;
229
230 anyhow::Ok(room)
231 };
232
233 let room = create_room.await;
234 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
235 room.map_err(Arc::new)
236 })
237 .shared();
238 self.pending_room_creation = Some(room.clone());
239 cx.background_executor().spawn(async move {
240 room.await.map_err(|err| anyhow!("{:?}", err))?;
241 anyhow::Ok(())
242 })
243 };
244
245 cx.spawn(move |this, mut cx| async move {
246 let result = invite.await;
247 if result.is_ok() {
248 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
249 } else {
250 // TODO: Resport collaboration error
251 }
252
253 this.update(&mut cx, |this, cx| {
254 this.pending_invites.remove(&called_user_id);
255 cx.notify();
256 })?;
257 result
258 })
259 }
260
261 pub fn cancel_invite(
262 &mut self,
263 called_user_id: u64,
264 cx: &mut ModelContext<Self>,
265 ) -> Task<Result<()>> {
266 let room_id = if let Some(room) = self.room() {
267 room.read(cx).id()
268 } else {
269 return Task::ready(Err(anyhow!("no active call")));
270 };
271
272 let client = self.client.clone();
273 cx.background_executor().spawn(async move {
274 client
275 .request(proto::CancelCall {
276 room_id,
277 called_user_id,
278 })
279 .await?;
280 anyhow::Ok(())
281 })
282 }
283
284 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
285 self.incoming_call.1.clone()
286 }
287
288 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
289 if self.room.is_some() {
290 return Task::ready(Err(anyhow!("cannot join while on another call")));
291 }
292
293 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
294 call
295 } else {
296 return Task::ready(Err(anyhow!("no incoming call")));
297 };
298
299 if self.pending_room_creation.is_some() {
300 return Task::ready(Ok(()));
301 }
302
303 let room_id = call.room_id.clone();
304 let client = self.client.clone();
305 let user_store = self.user_store.clone();
306 let join = self
307 ._join_debouncer
308 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
309
310 cx.spawn(|this, mut cx| async move {
311 let room = join.await?;
312 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
313 .await?;
314 this.update(&mut cx, |this, cx| {
315 this.report_call_event("accept incoming", cx)
316 })?;
317 Ok(())
318 })
319 }
320
321 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
322 let call = self
323 .incoming_call
324 .0
325 .borrow_mut()
326 .take()
327 .ok_or_else(|| anyhow!("no incoming call"))?;
328 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
329 self.client.send(proto::DeclineCall {
330 room_id: call.room_id,
331 })?;
332 Ok(())
333 }
334
335 pub fn join_channel(
336 &mut self,
337 channel_id: u64,
338 cx: &mut ModelContext<Self>,
339 ) -> Task<Result<Option<Model<Room>>>> {
340 if let Some(room) = self.room().cloned() {
341 if room.read(cx).channel_id() == Some(channel_id) {
342 return Task::ready(Ok(Some(room)));
343 } else {
344 room.update(cx, |room, cx| room.clear_state(cx));
345 }
346 }
347
348 if self.pending_room_creation.is_some() {
349 return Task::ready(Ok(None));
350 }
351
352 let client = self.client.clone();
353 let user_store = self.user_store.clone();
354 let join = self._join_debouncer.spawn(cx, move |cx| async move {
355 Room::join_channel(channel_id, client, user_store, cx).await
356 });
357
358 cx.spawn(|this, mut cx| async move {
359 let room = join.await?;
360 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
361 .await?;
362 this.update(&mut cx, |this, cx| {
363 this.report_call_event("join channel", cx)
364 })?;
365 Ok(room)
366 })
367 }
368
369 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
370 cx.notify();
371 self.report_call_event("hang up", cx);
372
373 Audio::end_call(cx);
374 if let Some((room, _)) = self.room.take() {
375 room.update(cx, |room, cx| room.leave(cx))
376 } else {
377 Task::ready(Ok(()))
378 }
379 }
380
381 pub fn share_project(
382 &mut self,
383 project: Model<Project>,
384 cx: &mut ModelContext<Self>,
385 ) -> Task<Result<u64>> {
386 if let Some((room, _)) = self.room.as_ref() {
387 self.report_call_event("share project", cx);
388 room.update(cx, |room, cx| room.share_project(project, cx))
389 } else {
390 Task::ready(Err(anyhow!("no active call")))
391 }
392 }
393
394 pub fn unshare_project(
395 &mut self,
396 project: Model<Project>,
397 cx: &mut ModelContext<Self>,
398 ) -> Result<()> {
399 if let Some((room, _)) = self.room.as_ref() {
400 self.report_call_event("unshare project", cx);
401 room.update(cx, |room, cx| room.unshare_project(project, cx))
402 } else {
403 Err(anyhow!("no active call"))
404 }
405 }
406
407 pub fn location(&self) -> Option<&WeakModel<Project>> {
408 self.location.as_ref()
409 }
410
411 pub fn set_location(
412 &mut self,
413 project: Option<&Model<Project>>,
414 cx: &mut ModelContext<Self>,
415 ) -> Task<Result<()>> {
416 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
417 self.location = project.map(|project| project.downgrade());
418 if let Some((room, _)) = self.room.as_ref() {
419 return room.update(cx, |room, cx| room.set_location(project, cx));
420 }
421 }
422 Task::ready(Ok(()))
423 }
424
425 fn set_room(
426 &mut self,
427 room: Option<Model<Room>>,
428 cx: &mut ModelContext<Self>,
429 ) -> Task<Result<()>> {
430 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
431 cx.notify();
432 if let Some(room) = room {
433 if room.read(cx).status().is_offline() {
434 self.room = None;
435 Task::ready(Ok(()))
436 } else {
437 let subscriptions = vec![
438 cx.observe(&room, |this, room, cx| {
439 if room.read(cx).status().is_offline() {
440 this.set_room(None, cx).detach_and_log_err(cx);
441 }
442
443 cx.notify();
444 }),
445 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
446 ];
447 self.room = Some((room.clone(), subscriptions));
448 let location = self
449 .location
450 .as_ref()
451 .and_then(|location| location.upgrade());
452 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
453 }
454 } else {
455 self.room = None;
456 Task::ready(Ok(()))
457 }
458 } else {
459 Task::ready(Ok(()))
460 }
461 }
462
463 pub fn room(&self) -> Option<&Model<Room>> {
464 self.room.as_ref().map(|(room, _)| room)
465 }
466
467 pub fn client(&self) -> Arc<Client> {
468 self.client.clone()
469 }
470
471 pub fn pending_invites(&self) -> &HashSet<u64> {
472 &self.pending_invites
473 }
474
475 pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
476 if let Some(room) = self.room() {
477 let room = room.read(cx);
478 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
479 }
480 }
481}
482
483pub fn report_call_event_for_room(
484 operation: &'static str,
485 room_id: u64,
486 channel_id: Option<u64>,
487 client: &Arc<Client>,
488 cx: &mut AppContext,
489) {
490 let telemetry = client.telemetry();
491 let telemetry_settings = *TelemetrySettings::get_global(cx);
492
493 telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
494}
495
496pub fn report_call_event_for_channel(
497 operation: &'static str,
498 channel_id: u64,
499 client: &Arc<Client>,
500 cx: &AppContext,
501) {
502 let room = ActiveCall::global(cx).read(cx).room();
503
504 let telemetry = client.telemetry();
505
506 let telemetry_settings = *TelemetrySettings::get_global(cx);
507
508 telemetry.report_call_event(
509 telemetry_settings,
510 operation,
511 room.map(|r| r.read(cx).id()),
512 Some(channel_id),
513 )
514}
515
516pub struct Call {
517 active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
518 parent_workspace: WeakView<Workspace>,
519}
520
521impl Call {
522 pub fn new(
523 parent_workspace: WeakView<Workspace>,
524 cx: &mut ViewContext<'_, Workspace>,
525 ) -> Box<dyn CallHandler> {
526 let mut active_call = None;
527 if cx.has_global::<Model<ActiveCall>>() {
528 let call = cx.global::<Model<ActiveCall>>().clone();
529 let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
530 active_call = Some((call, subscriptions));
531 }
532 Box::new(Self {
533 active_call,
534 parent_workspace,
535 })
536 }
537 fn on_active_call_event(
538 workspace: &mut Workspace,
539 _: Model<ActiveCall>,
540 event: &room::Event,
541 cx: &mut ViewContext<Workspace>,
542 ) {
543 match event {
544 room::Event::ParticipantLocationChanged { participant_id }
545 | room::Event::RemoteVideoTracksChanged { participant_id } => {
546 workspace.leader_updated(*participant_id, cx);
547 }
548 _ => {}
549 }
550 }
551}
552
553#[async_trait(?Send)]
554impl CallHandler for Call {
555 fn peer_state(
556 &mut self,
557 leader_id: PeerId,
558 cx: &mut ViewContext<Workspace>,
559 ) -> Option<(bool, bool)> {
560 let (call, _) = self.active_call.as_ref()?;
561 let room = call.read(cx).room()?.read(cx);
562 let participant = room.remote_participant_for_peer_id(leader_id)?;
563
564 let leader_in_this_app;
565 let leader_in_this_project;
566 match participant.location {
567 ParticipantLocation::SharedProject { project_id } => {
568 leader_in_this_app = true;
569 leader_in_this_project = Some(project_id)
570 == self
571 .parent_workspace
572 .update(cx, |this, cx| this.project().read(cx).remote_id())
573 .log_err()
574 .flatten();
575 }
576 ParticipantLocation::UnsharedProject => {
577 leader_in_this_app = true;
578 leader_in_this_project = false;
579 }
580 ParticipantLocation::External => {
581 leader_in_this_app = false;
582 leader_in_this_project = false;
583 }
584 };
585
586 Some((leader_in_this_project, leader_in_this_app))
587 }
588
589 fn shared_screen_for_peer(
590 &self,
591 peer_id: PeerId,
592 pane: &View<Pane>,
593 cx: &mut ViewContext<Workspace>,
594 ) -> Option<Box<dyn ItemHandle>> {
595 let (call, _) = self.active_call.as_ref()?;
596 dbg!("A");
597 let room = call.read(cx).room()?.read(cx);
598 dbg!("B");
599 let participant = room.remote_participant_for_peer_id(peer_id)?;
600 dbg!("C");
601 let track = participant.video_tracks.values().next()?.clone();
602 dbg!("D");
603 let user = participant.user.clone();
604 for item in pane.read(cx).items_of_type::<SharedScreen>() {
605 if item.read(cx).peer_id == peer_id {
606 return Some(Box::new(item));
607 }
608 }
609
610 Some(Box::new(cx.build_view(|cx| {
611 SharedScreen::new(&track, peer_id, user.clone(), cx)
612 })))
613 }
614 fn room_id(&self, cx: &AppContext) -> Option<u64> {
615 Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
616 }
617 fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
618 let Some((call, _)) = self.active_call.as_ref() else {
619 bail!("Cannot exit a call; not in a call");
620 };
621
622 call.update(&mut cx, |this, cx| this.hang_up(cx))
623 }
624 fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
625 ActiveCall::global(cx).read(cx).location().cloned()
626 }
627 fn invite(
628 &mut self,
629 called_user_id: u64,
630 initial_project: Option<Model<Project>>,
631 cx: &mut AppContext,
632 ) -> Task<Result<()>> {
633 ActiveCall::global(cx).update(cx, |this, cx| {
634 this.invite(called_user_id, initial_project, cx)
635 })
636 }
637 fn remote_participants(&self, cx: &AppContext) -> Option<Vec<(Arc<User>, PeerId)>> {
638 self.active_call
639 .as_ref()
640 .map(|call| {
641 call.0.read(cx).room().map(|room| {
642 room.read(cx)
643 .remote_participants()
644 .iter()
645 .map(|participant| {
646 (participant.1.user.clone(), participant.1.peer_id.clone())
647 })
648 .collect()
649 })
650 })
651 .flatten()
652 }
653 fn is_muted(&self, cx: &AppContext) -> Option<bool> {
654 self.active_call
655 .as_ref()
656 .map(|call| {
657 call.0
658 .read(cx)
659 .room()
660 .map(|room| room.read(cx).is_muted(cx))
661 })
662 .flatten()
663 }
664 fn toggle_mute(&self, cx: &mut AppContext) {
665 self.active_call.as_ref().map(|call| {
666 call.0.update(cx, |this, cx| {
667 this.room().map(|room| {
668 room.update(cx, |this, cx| {
669 this.toggle_mute(cx);
670 })
671 })
672 })
673 });
674 }
675 fn toggle_screen_share(&self, cx: &mut AppContext) {
676 self.active_call.as_ref().map(|call| {
677 call.0.update(cx, |this, cx| {
678 this.room().map(|room| {
679 room.update(cx, |this, cx| {
680 if this.is_screen_sharing() {
681 dbg!("Unsharing");
682 this.unshare_screen(cx);
683 } else {
684 dbg!("Sharing");
685 let t = this.share_screen(cx);
686 cx.spawn(move |_, cx| async move {
687 t.await.log_err();
688 })
689 .detach();
690 }
691 })
692 })
693 })
694 });
695 }
696}
697
698#[cfg(test)]
699mod test {
700 use gpui::TestAppContext;
701
702 use crate::OneAtATime;
703
704 #[gpui::test]
705 async fn test_one_at_a_time(cx: &mut TestAppContext) {
706 let mut one_at_a_time = OneAtATime { cancel: None };
707
708 assert_eq!(
709 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
710 .await
711 .unwrap(),
712 Some(1)
713 );
714
715 let (a, b) = cx.update(|cx| {
716 (
717 one_at_a_time.spawn(cx, |_| async {
718 assert!(false);
719 Ok(2)
720 }),
721 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
722 )
723 });
724
725 assert_eq!(a.await.unwrap(), None);
726 assert_eq!(b.await.unwrap(), Some(3));
727
728 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
729 drop(one_at_a_time);
730
731 assert_eq!(promise.await.unwrap(), None);
732 }
733}