1pub mod participant;
2pub mod room;
3
4use anyhow::{Context as _, Result, anyhow};
5use audio::Audio;
6use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
7use collections::HashSet;
8use futures::{Future, FutureExt, channel::oneshot, future::Shared};
9use gpui::{
10 AnyView, App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task,
11 WeakEntity, Window,
12};
13use postage::watch;
14use project::Project;
15use room::Event;
16use settings::Settings;
17use std::sync::Arc;
18use workspace::{
19 ActiveCallEvent, AnyActiveCall, GlobalAnyActiveCall, Pane, RemoteCollaborator, SharedScreen,
20 Workspace,
21};
22
23pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
24pub use room::Room;
25
26use crate::call_settings::CallSettings;
27
28pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
29 let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
30 cx.set_global(GlobalAnyActiveCall(Arc::new(ActiveCallEntity(active_call))))
31}
32
33#[derive(Clone)]
34struct ActiveCallEntity(Entity<ActiveCall>);
35
36impl AnyActiveCall for ActiveCallEntity {
37 fn entity(&self) -> gpui::AnyEntity {
38 self.0.clone().into_any()
39 }
40
41 fn is_in_room(&self, cx: &App) -> bool {
42 self.0.read(cx).room().is_some()
43 }
44
45 fn room_id(&self, cx: &App) -> Option<u64> {
46 Some(self.0.read(cx).room()?.read(cx).id())
47 }
48
49 fn channel_id(&self, cx: &App) -> Option<ChannelId> {
50 self.0.read(cx).room()?.read(cx).channel_id()
51 }
52
53 fn hang_up(&self, cx: &mut App) -> Task<Result<()>> {
54 self.0.update(cx, |this, cx| this.hang_up(cx))
55 }
56
57 fn unshare_project(&self, project: Entity<Project>, cx: &mut App) -> Result<()> {
58 self.0
59 .update(cx, |this, cx| this.unshare_project(project, cx))
60 }
61
62 fn remote_participant_for_peer_id(
63 &self,
64 peer_id: proto::PeerId,
65 cx: &App,
66 ) -> Option<workspace::RemoteCollaborator> {
67 let room = self.0.read(cx).room()?.read(cx);
68 let participant = room.remote_participant_for_peer_id(peer_id)?;
69 Some(RemoteCollaborator {
70 user: participant.user.clone(),
71 peer_id: participant.peer_id,
72 location: participant.location,
73 participant_index: participant.participant_index,
74 })
75 }
76
77 fn is_sharing_project(&self, cx: &App) -> bool {
78 self.0
79 .read(cx)
80 .room()
81 .map_or(false, |room| room.read(cx).is_sharing_project())
82 }
83
84 fn has_remote_participants(&self, cx: &App) -> bool {
85 self.0.read(cx).room().map_or(false, |room| {
86 !room.read(cx).remote_participants().is_empty()
87 })
88 }
89
90 fn local_participant_is_guest(&self, cx: &App) -> bool {
91 self.0
92 .read(cx)
93 .room()
94 .map_or(false, |room| room.read(cx).local_participant_is_guest())
95 }
96
97 fn client(&self, cx: &App) -> Arc<Client> {
98 self.0.read(cx).client()
99 }
100
101 fn share_on_join(&self, cx: &App) -> bool {
102 CallSettings::get_global(cx).share_on_join
103 }
104
105 fn join_channel(&self, channel_id: ChannelId, cx: &mut App) -> Task<Result<bool>> {
106 let task = self
107 .0
108 .update(cx, |this, cx| this.join_channel(channel_id, cx));
109 cx.spawn(async move |_cx| {
110 let result = task.await?;
111 Ok(result.is_some())
112 })
113 }
114
115 fn room_update_completed(&self, cx: &mut App) -> Task<()> {
116 let Some(room) = self.0.read(cx).room().cloned() else {
117 return Task::ready(());
118 };
119 let future = room.update(cx, |room, _cx| room.room_update_completed());
120 cx.spawn(async move |_cx| {
121 future.await;
122 })
123 }
124
125 fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
126 let room = self.0.read(cx).room()?;
127 room.read(cx).most_active_project(cx)
128 }
129
130 fn share_project(&self, project: Entity<Project>, cx: &mut App) -> Task<Result<u64>> {
131 self.0
132 .update(cx, |this, cx| this.share_project(project, cx))
133 }
134
135 fn join_project(
136 &self,
137 project_id: u64,
138 language_registry: Arc<language::LanguageRegistry>,
139 fs: Arc<dyn fs::Fs>,
140 cx: &mut App,
141 ) -> Task<Result<Entity<Project>>> {
142 let Some(room) = self.0.read(cx).room().cloned() else {
143 return Task::ready(Err(anyhow::anyhow!("not in a call")));
144 };
145 room.update(cx, |room, cx| {
146 room.join_project(project_id, language_registry, fs, cx)
147 })
148 }
149
150 fn peer_id_for_user_in_room(&self, user_id: u64, cx: &App) -> Option<proto::PeerId> {
151 let room = self.0.read(cx).room()?.read(cx);
152 room.remote_participants()
153 .values()
154 .find(|p| p.user.id == user_id)
155 .map(|p| p.peer_id)
156 }
157
158 fn subscribe(
159 &self,
160 window: &mut Window,
161 cx: &mut Context<Workspace>,
162 handler: Box<
163 dyn Fn(&mut Workspace, &ActiveCallEvent, &mut Window, &mut Context<Workspace>),
164 >,
165 ) -> Subscription {
166 cx.subscribe_in(
167 &self.0,
168 window,
169 move |workspace, _, event: &room::Event, window, cx| {
170 let mapped = match event {
171 room::Event::ParticipantLocationChanged { participant_id } => {
172 Some(ActiveCallEvent::ParticipantLocationChanged {
173 participant_id: *participant_id,
174 })
175 }
176 room::Event::RemoteVideoTracksChanged { participant_id } => {
177 Some(ActiveCallEvent::RemoteVideoTracksChanged {
178 participant_id: *participant_id,
179 })
180 }
181 _ => None,
182 };
183 if let Some(event) = mapped {
184 handler(workspace, &event, window, cx);
185 }
186 },
187 )
188 }
189
190 fn create_shared_screen(
191 &self,
192 peer_id: client::proto::PeerId,
193 pane: &Entity<Pane>,
194 window: &mut Window,
195 cx: &mut App,
196 ) -> Option<Entity<workspace::SharedScreen>> {
197 let room = self.0.read(cx).room()?.clone();
198 let participant = room.read(cx).remote_participant_for_peer_id(peer_id)?;
199 let track = participant.video_tracks.values().next()?.clone();
200 let user = participant.user.clone();
201
202 for item in pane.read(cx).items_of_type::<SharedScreen>() {
203 if item.read(cx).peer_id == peer_id {
204 return Some(item);
205 }
206 }
207
208 Some(cx.new(|cx: &mut Context<SharedScreen>| {
209 let my_sid = track.sid();
210 cx.subscribe(
211 &room,
212 move |_: &mut SharedScreen,
213 _: Entity<Room>,
214 ev: &room::Event,
215 cx: &mut Context<SharedScreen>| {
216 if let room::Event::RemoteVideoTrackUnsubscribed { sid } = ev
217 && *sid == my_sid
218 {
219 cx.emit(workspace::shared_screen::Event::Close);
220 }
221 },
222 )
223 .detach();
224
225 cx.observe_release(
226 &room,
227 |_: &mut SharedScreen, _: &mut Room, cx: &mut Context<SharedScreen>| {
228 cx.emit(workspace::shared_screen::Event::Close);
229 },
230 )
231 .detach();
232
233 let view = cx.new(|cx| RemoteVideoTrackView::new(track.clone(), window, cx));
234 cx.subscribe(
235 &view,
236 |_: &mut SharedScreen,
237 _: Entity<RemoteVideoTrackView>,
238 ev: &RemoteVideoTrackViewEvent,
239 cx: &mut Context<SharedScreen>| match ev {
240 RemoteVideoTrackViewEvent::Close => {
241 cx.emit(workspace::shared_screen::Event::Close);
242 }
243 },
244 )
245 .detach();
246
247 pub(super) fn clone_remote_video_track_view(
248 view: &AnyView,
249 window: &mut Window,
250 cx: &mut App,
251 ) -> AnyView {
252 let view = view
253 .clone()
254 .downcast::<RemoteVideoTrackView>()
255 .expect("SharedScreen view must be a RemoteVideoTrackView");
256 let cloned = view.update(cx, |view, cx| view.clone(window, cx));
257 AnyView::from(cloned)
258 }
259
260 SharedScreen::new(
261 peer_id,
262 user,
263 AnyView::from(view),
264 clone_remote_video_track_view,
265 cx,
266 )
267 }))
268 }
269}
270
271pub struct OneAtATime {
272 cancel: Option<oneshot::Sender<()>>,
273}
274
275impl OneAtATime {
276 /// spawn a task in the given context.
277 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
278 /// otherwise you'll see the result of the task.
279 fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
280 where
281 F: 'static + FnOnce(AsyncApp) -> Fut,
282 Fut: Future<Output = Result<R>>,
283 R: 'static,
284 {
285 let (tx, rx) = oneshot::channel();
286 self.cancel.replace(tx);
287 cx.spawn(async move |cx| {
288 futures::select_biased! {
289 _ = rx.fuse() => Ok(None),
290 result = f(cx.clone()).fuse() => result.map(Some),
291 }
292 })
293 }
294
295 fn running(&self) -> bool {
296 self.cancel
297 .as_ref()
298 .is_some_and(|cancel| !cancel.is_canceled())
299 }
300}
301
302#[derive(Clone)]
303pub struct IncomingCall {
304 pub room_id: u64,
305 pub calling_user: Arc<User>,
306 pub participants: Vec<Arc<User>>,
307 pub initial_project: Option<proto::ParticipantProject>,
308}
309
310/// Singleton global maintaining the user's participation in a room across workspaces.
311pub struct ActiveCall {
312 room: Option<(Entity<Room>, Vec<Subscription>)>,
313 pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
314 location: Option<WeakEntity<Project>>,
315 _join_debouncer: OneAtATime,
316 pending_invites: HashSet<u64>,
317 incoming_call: (
318 watch::Sender<Option<IncomingCall>>,
319 watch::Receiver<Option<IncomingCall>>,
320 ),
321 client: Arc<Client>,
322 user_store: Entity<UserStore>,
323 _subscriptions: Vec<client::Subscription>,
324}
325
326impl EventEmitter<Event> for ActiveCall {}
327
328impl ActiveCall {
329 fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
330 Self {
331 room: None,
332 pending_room_creation: None,
333 location: None,
334 pending_invites: Default::default(),
335 incoming_call: watch::channel(),
336 _join_debouncer: OneAtATime { cancel: None },
337 _subscriptions: vec![
338 client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
339 client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
340 ],
341 client,
342 user_store,
343 }
344 }
345
346 pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
347 self.room()?.read(cx).channel_id()
348 }
349
350 async fn handle_incoming_call(
351 this: Entity<Self>,
352 envelope: TypedEnvelope<proto::IncomingCall>,
353 mut cx: AsyncApp,
354 ) -> Result<proto::Ack> {
355 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
356 let call = IncomingCall {
357 room_id: envelope.payload.room_id,
358 participants: user_store
359 .update(&mut cx, |user_store, cx| {
360 user_store.get_users(envelope.payload.participant_user_ids, cx)
361 })
362 .await?,
363 calling_user: user_store
364 .update(&mut cx, |user_store, cx| {
365 user_store.get_user(envelope.payload.calling_user_id, cx)
366 })
367 .await?,
368 initial_project: envelope.payload.initial_project,
369 };
370 this.update(&mut cx, |this, _| {
371 *this.incoming_call.0.borrow_mut() = Some(call);
372 });
373
374 Ok(proto::Ack {})
375 }
376
377 async fn handle_call_canceled(
378 this: Entity<Self>,
379 envelope: TypedEnvelope<proto::CallCanceled>,
380 mut cx: AsyncApp,
381 ) -> Result<()> {
382 this.update(&mut cx, |this, _| {
383 let mut incoming_call = this.incoming_call.0.borrow_mut();
384 if incoming_call
385 .as_ref()
386 .is_some_and(|call| call.room_id == envelope.payload.room_id)
387 {
388 incoming_call.take();
389 }
390 });
391 Ok(())
392 }
393
394 pub fn global(cx: &App) -> Entity<Self> {
395 Self::try_global(cx).unwrap()
396 }
397
398 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
399 let any = cx.try_global::<GlobalAnyActiveCall>()?;
400 any.0.entity().downcast::<Self>().ok()
401 }
402
403 pub fn invite(
404 &mut self,
405 called_user_id: u64,
406 initial_project: Option<Entity<Project>>,
407 cx: &mut Context<Self>,
408 ) -> Task<Result<()>> {
409 if !self.pending_invites.insert(called_user_id) {
410 return Task::ready(Err(anyhow!("user was already invited")));
411 }
412 cx.notify();
413
414 if self._join_debouncer.running() {
415 return Task::ready(Ok(()));
416 }
417
418 let room = if let Some(room) = self.room().cloned() {
419 Some(Task::ready(Ok(room)).shared())
420 } else {
421 self.pending_room_creation.clone()
422 };
423
424 let invite = if let Some(room) = room {
425 cx.spawn(async move |_, cx| {
426 let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
427
428 let initial_project_id = if let Some(initial_project) = initial_project {
429 Some(
430 room.update(cx, |room, cx| room.share_project(initial_project, cx))
431 .await?,
432 )
433 } else {
434 None
435 };
436
437 room.update(cx, move |room, cx| {
438 room.call(called_user_id, initial_project_id, cx)
439 })
440 .await?;
441
442 anyhow::Ok(())
443 })
444 } else {
445 let client = self.client.clone();
446 let user_store = self.user_store.clone();
447 let room = cx
448 .spawn(async move |this, cx| {
449 let create_room = async {
450 let room = cx
451 .update(|cx| {
452 Room::create(
453 called_user_id,
454 initial_project,
455 client,
456 user_store,
457 cx,
458 )
459 })
460 .await?;
461
462 this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
463 .await?;
464
465 anyhow::Ok(room)
466 };
467
468 let room = create_room.await;
469 this.update(cx, |this, _| this.pending_room_creation = None)?;
470 room.map_err(Arc::new)
471 })
472 .shared();
473 self.pending_room_creation = Some(room.clone());
474 cx.background_spawn(async move {
475 room.await.map_err(|err| anyhow!("{err:?}"))?;
476 anyhow::Ok(())
477 })
478 };
479
480 cx.spawn(async move |this, cx| {
481 let result = invite.await;
482 if result.is_ok() {
483 this.update(cx, |this, cx| {
484 this.report_call_event("Participant Invited", cx)
485 })?;
486 } else {
487 //TODO: report collaboration error
488 log::error!("invite failed: {:?}", result);
489 }
490
491 this.update(cx, |this, cx| {
492 this.pending_invites.remove(&called_user_id);
493 cx.notify();
494 })?;
495 result
496 })
497 }
498
499 pub fn cancel_invite(
500 &mut self,
501 called_user_id: u64,
502 cx: &mut Context<Self>,
503 ) -> Task<Result<()>> {
504 let room_id = if let Some(room) = self.room() {
505 room.read(cx).id()
506 } else {
507 return Task::ready(Err(anyhow!("no active call")));
508 };
509
510 let client = self.client.clone();
511 cx.background_spawn(async move {
512 client
513 .request(proto::CancelCall {
514 room_id,
515 called_user_id,
516 })
517 .await?;
518 anyhow::Ok(())
519 })
520 }
521
522 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
523 self.incoming_call.1.clone()
524 }
525
526 pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
527 if self.room.is_some() {
528 return Task::ready(Err(anyhow!("cannot join while on another call")));
529 }
530
531 let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
532 call
533 } else {
534 return Task::ready(Err(anyhow!("no incoming call")));
535 };
536
537 if self.pending_room_creation.is_some() {
538 return Task::ready(Ok(()));
539 }
540
541 let room_id = call.room_id;
542 let client = self.client.clone();
543 let user_store = self.user_store.clone();
544 let join = self
545 ._join_debouncer
546 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
547
548 cx.spawn(async move |this, cx| {
549 let room = join.await?;
550 this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
551 .await?;
552 this.update(cx, |this, cx| {
553 this.report_call_event("Incoming Call Accepted", cx)
554 })?;
555 Ok(())
556 })
557 }
558
559 pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
560 let call = self
561 .incoming_call
562 .0
563 .borrow_mut()
564 .take()
565 .context("no incoming call")?;
566 telemetry::event!("Incoming Call Declined", room_id = call.room_id);
567 self.client.send(proto::DeclineCall {
568 room_id: call.room_id,
569 })?;
570 Ok(())
571 }
572
573 pub fn join_channel(
574 &mut self,
575 channel_id: ChannelId,
576 cx: &mut Context<Self>,
577 ) -> Task<Result<Option<Entity<Room>>>> {
578 if let Some(room) = self.room().cloned() {
579 if room.read(cx).channel_id() == Some(channel_id) {
580 return Task::ready(Ok(Some(room)));
581 } else {
582 room.update(cx, |room, cx| room.clear_state(cx));
583 }
584 }
585
586 if self.pending_room_creation.is_some() {
587 return Task::ready(Ok(None));
588 }
589
590 let client = self.client.clone();
591 let user_store = self.user_store.clone();
592 let join = self._join_debouncer.spawn(cx, move |cx| async move {
593 Room::join_channel(channel_id, client, user_store, cx).await
594 });
595
596 cx.spawn(async move |this, cx| {
597 let room = join.await?;
598 this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
599 .await?;
600 this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
601 Ok(room)
602 })
603 }
604
605 pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
606 cx.notify();
607 self.report_call_event("Call Ended", cx);
608
609 Audio::end_call(cx);
610
611 let channel_id = self.channel_id(cx);
612 if let Some((room, _)) = self.room.take() {
613 cx.emit(Event::RoomLeft { channel_id });
614 room.update(cx, |room, cx| room.leave(cx))
615 } else {
616 Task::ready(Ok(()))
617 }
618 }
619
620 pub fn share_project(
621 &mut self,
622 project: Entity<Project>,
623 cx: &mut Context<Self>,
624 ) -> Task<Result<u64>> {
625 if let Some((room, _)) = self.room.as_ref() {
626 self.report_call_event("Project Shared", cx);
627 room.update(cx, |room, cx| room.share_project(project, cx))
628 } else {
629 Task::ready(Err(anyhow!("no active call")))
630 }
631 }
632
633 pub fn unshare_project(
634 &mut self,
635 project: Entity<Project>,
636 cx: &mut Context<Self>,
637 ) -> Result<()> {
638 let (room, _) = self.room.as_ref().context("no active call")?;
639 self.report_call_event("Project Unshared", cx);
640 room.update(cx, |room, cx| room.unshare_project(project, cx))
641 }
642
643 pub fn location(&self) -> Option<&WeakEntity<Project>> {
644 self.location.as_ref()
645 }
646
647 pub fn set_location(
648 &mut self,
649 project: Option<&Entity<Project>>,
650 cx: &mut Context<Self>,
651 ) -> Task<Result<()>> {
652 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
653 self.location = project.map(|project| project.downgrade());
654 if let Some((room, _)) = self.room.as_ref() {
655 return room.update(cx, |room, cx| room.set_location(project, cx));
656 }
657 }
658 Task::ready(Ok(()))
659 }
660
661 fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
662 if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
663 Task::ready(Ok(()))
664 } else {
665 cx.notify();
666 if let Some(room) = room {
667 if room.read(cx).status().is_offline() {
668 self.room = None;
669 Task::ready(Ok(()))
670 } else {
671 let subscriptions = vec![
672 cx.observe(&room, |this, room, cx| {
673 if room.read(cx).status().is_offline() {
674 this.set_room(None, cx).detach_and_log_err(cx);
675 }
676
677 cx.notify();
678 }),
679 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
680 ];
681 self.room = Some((room.clone(), subscriptions));
682 let location = self
683 .location
684 .as_ref()
685 .and_then(|location| location.upgrade());
686 let channel_id = room.read(cx).channel_id();
687 cx.emit(Event::RoomJoined { channel_id });
688 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
689 }
690 } else {
691 self.room = None;
692 Task::ready(Ok(()))
693 }
694 }
695 }
696
697 pub fn room(&self) -> Option<&Entity<Room>> {
698 self.room.as_ref().map(|(room, _)| room)
699 }
700
701 pub fn client(&self) -> Arc<Client> {
702 self.client.clone()
703 }
704
705 pub fn pending_invites(&self) -> &HashSet<u64> {
706 &self.pending_invites
707 }
708
709 pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
710 if let Some(room) = self.room() {
711 let room = room.read(cx);
712 telemetry::event!(
713 operation,
714 room_id = room.id(),
715 channel_id = room.channel_id()
716 )
717 }
718 }
719}
720
721#[cfg(test)]
722mod test {
723 use gpui::TestAppContext;
724
725 use crate::OneAtATime;
726
727 #[gpui::test]
728 async fn test_one_at_a_time(cx: &mut TestAppContext) {
729 let mut one_at_a_time = OneAtATime { cancel: None };
730
731 assert_eq!(
732 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
733 .await
734 .unwrap(),
735 Some(1)
736 );
737
738 let (a, b) = cx.update(|cx| {
739 (
740 one_at_a_time.spawn(cx, |_| async {
741 panic!("");
742 }),
743 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
744 )
745 });
746
747 assert_eq!(a.await.unwrap(), None::<u32>);
748 assert_eq!(b.await.unwrap(), Some(3));
749
750 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
751 drop(one_at_a_time);
752
753 assert_eq!(promise.await.unwrap(), None);
754 }
755}