1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use client::{proto, Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
9use collections::HashSet;
10use futures::{channel::oneshot, future::Shared, Future, FutureExt};
11use gpui::{
12 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Subscription, Task,
13 WeakModel,
14};
15use postage::watch;
16use project::Project;
17use room::Event;
18use settings::Settings;
19use std::sync::Arc;
20
21pub use participant::ParticipantLocation;
22pub use room::Room;
23
24pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 CallSettings::register(cx);
26
27 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
28 cx.set_global(active_call);
29}
30
31pub struct OneAtATime {
32 cancel: Option<oneshot::Sender<()>>,
33}
34
35impl OneAtATime {
36 /// spawn a task in the given context.
37 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
38 /// otherwise you'll see the result of the task.
39 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
40 where
41 F: 'static + FnOnce(AsyncAppContext) -> Fut,
42 Fut: Future<Output = Result<R>>,
43 R: 'static,
44 {
45 let (tx, rx) = oneshot::channel();
46 self.cancel.replace(tx);
47 cx.spawn(|cx| async move {
48 futures::select_biased! {
49 _ = rx.fuse() => Ok(None),
50 result = f(cx).fuse() => result.map(Some),
51 }
52 })
53 }
54
55 fn running(&self) -> bool {
56 self.cancel
57 .as_ref()
58 .is_some_and(|cancel| !cancel.is_canceled())
59 }
60}
61
62#[derive(Clone)]
63pub struct IncomingCall {
64 pub room_id: u64,
65 pub calling_user: Arc<User>,
66 pub participants: Vec<Arc<User>>,
67 pub initial_project: Option<proto::ParticipantProject>,
68}
69
70/// Singleton global maintaining the user's participation in a room across workspaces.
71pub struct ActiveCall {
72 room: Option<(Model<Room>, Vec<Subscription>)>,
73 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
74 location: Option<WeakModel<Project>>,
75 _join_debouncer: OneAtATime,
76 pending_invites: HashSet<u64>,
77 incoming_call: (
78 watch::Sender<Option<IncomingCall>>,
79 watch::Receiver<Option<IncomingCall>>,
80 ),
81 client: Arc<Client>,
82 user_store: Model<UserStore>,
83 _subscriptions: Vec<client::Subscription>,
84}
85
86impl EventEmitter<Event> for ActiveCall {}
87
88impl ActiveCall {
89 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
90 Self {
91 room: None,
92 pending_room_creation: None,
93 location: None,
94 pending_invites: Default::default(),
95 incoming_call: watch::channel(),
96 _join_debouncer: OneAtATime { cancel: None },
97 _subscriptions: vec![
98 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
99 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
100 ],
101 client,
102 user_store,
103 }
104 }
105
106 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
107 self.room()?.read(cx).channel_id()
108 }
109
110 async fn handle_incoming_call(
111 this: Model<Self>,
112 envelope: TypedEnvelope<proto::IncomingCall>,
113 _: Arc<Client>,
114 mut cx: AsyncAppContext,
115 ) -> Result<proto::Ack> {
116 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
117 let call = IncomingCall {
118 room_id: envelope.payload.room_id,
119 participants: user_store
120 .update(&mut cx, |user_store, cx| {
121 user_store.get_users(envelope.payload.participant_user_ids, cx)
122 })?
123 .await?,
124 calling_user: user_store
125 .update(&mut cx, |user_store, cx| {
126 user_store.get_user(envelope.payload.calling_user_id, cx)
127 })?
128 .await?,
129 initial_project: envelope.payload.initial_project,
130 };
131 this.update(&mut cx, |this, _| {
132 *this.incoming_call.0.borrow_mut() = Some(call);
133 })?;
134
135 Ok(proto::Ack {})
136 }
137
138 async fn handle_call_canceled(
139 this: Model<Self>,
140 envelope: TypedEnvelope<proto::CallCanceled>,
141 _: Arc<Client>,
142 mut cx: AsyncAppContext,
143 ) -> Result<()> {
144 this.update(&mut cx, |this, _| {
145 let mut incoming_call = this.incoming_call.0.borrow_mut();
146 if incoming_call
147 .as_ref()
148 .map_or(false, |call| call.room_id == envelope.payload.room_id)
149 {
150 incoming_call.take();
151 }
152 })?;
153 Ok(())
154 }
155
156 pub fn global(cx: &AppContext) -> Model<Self> {
157 cx.global::<Model<Self>>().clone()
158 }
159
160 pub fn invite(
161 &mut self,
162 called_user_id: u64,
163 initial_project: Option<Model<Project>>,
164 cx: &mut ModelContext<Self>,
165 ) -> Task<Result<()>> {
166 if !self.pending_invites.insert(called_user_id) {
167 return Task::ready(Err(anyhow!("user was already invited")));
168 }
169 cx.notify();
170
171 if self._join_debouncer.running() {
172 return Task::ready(Ok(()));
173 }
174
175 let room = if let Some(room) = self.room().cloned() {
176 Some(Task::ready(Ok(room)).shared())
177 } else {
178 self.pending_room_creation.clone()
179 };
180
181 let invite = if let Some(room) = room {
182 cx.spawn(move |_, mut cx| async move {
183 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
184
185 let initial_project_id = if let Some(initial_project) = initial_project {
186 Some(
187 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
188 .await?,
189 )
190 } else {
191 None
192 };
193
194 room.update(&mut cx, move |room, cx| {
195 room.call(called_user_id, initial_project_id, cx)
196 })?
197 .await?;
198
199 anyhow::Ok(())
200 })
201 } else {
202 let client = self.client.clone();
203 let user_store = self.user_store.clone();
204 let room = cx
205 .spawn(move |this, mut cx| async move {
206 let create_room = async {
207 let room = cx
208 .update(|cx| {
209 Room::create(
210 called_user_id,
211 initial_project,
212 client,
213 user_store,
214 cx,
215 )
216 })?
217 .await?;
218
219 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
220 .await?;
221
222 anyhow::Ok(room)
223 };
224
225 let room = create_room.await;
226 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
227 room.map_err(Arc::new)
228 })
229 .shared();
230 self.pending_room_creation = Some(room.clone());
231 cx.background_executor().spawn(async move {
232 room.await.map_err(|err| anyhow!("{:?}", err))?;
233 anyhow::Ok(())
234 })
235 };
236
237 cx.spawn(move |this, mut cx| async move {
238 let result = invite.await;
239 if result.is_ok() {
240 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
241 } else {
242 // TODO: Resport collaboration error
243 }
244
245 this.update(&mut cx, |this, cx| {
246 this.pending_invites.remove(&called_user_id);
247 cx.notify();
248 })?;
249 result
250 })
251 }
252
253 pub fn cancel_invite(
254 &mut self,
255 called_user_id: u64,
256 cx: &mut ModelContext<Self>,
257 ) -> Task<Result<()>> {
258 let room_id = if let Some(room) = self.room() {
259 room.read(cx).id()
260 } else {
261 return Task::ready(Err(anyhow!("no active call")));
262 };
263
264 let client = self.client.clone();
265 cx.background_executor().spawn(async move {
266 client
267 .request(proto::CancelCall {
268 room_id,
269 called_user_id,
270 })
271 .await?;
272 anyhow::Ok(())
273 })
274 }
275
276 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
277 self.incoming_call.1.clone()
278 }
279
280 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
281 if self.room.is_some() {
282 return Task::ready(Err(anyhow!("cannot join while on another call")));
283 }
284
285 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
286 call
287 } else {
288 return Task::ready(Err(anyhow!("no incoming call")));
289 };
290
291 if self.pending_room_creation.is_some() {
292 return Task::ready(Ok(()));
293 }
294
295 let room_id = call.room_id.clone();
296 let client = self.client.clone();
297 let user_store = self.user_store.clone();
298 let join = self
299 ._join_debouncer
300 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
301
302 cx.spawn(|this, mut cx| async move {
303 let room = join.await?;
304 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
305 .await?;
306 this.update(&mut cx, |this, cx| {
307 this.report_call_event("accept incoming", cx)
308 })?;
309 Ok(())
310 })
311 }
312
313 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
314 let call = self
315 .incoming_call
316 .0
317 .borrow_mut()
318 .take()
319 .ok_or_else(|| anyhow!("no incoming call"))?;
320 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
321 self.client.send(proto::DeclineCall {
322 room_id: call.room_id,
323 })?;
324 Ok(())
325 }
326
327 pub fn join_channel(
328 &mut self,
329 channel_id: u64,
330 cx: &mut ModelContext<Self>,
331 ) -> Task<Result<Option<Model<Room>>>> {
332 if let Some(room) = self.room().cloned() {
333 if room.read(cx).channel_id() == Some(channel_id) {
334 return Task::ready(Ok(Some(room)));
335 } else {
336 room.update(cx, |room, cx| room.clear_state(cx));
337 }
338 }
339
340 if self.pending_room_creation.is_some() {
341 return Task::ready(Ok(None));
342 }
343
344 let client = self.client.clone();
345 let user_store = self.user_store.clone();
346 let join = self._join_debouncer.spawn(cx, move |cx| async move {
347 Room::join_channel(channel_id, client, user_store, cx).await
348 });
349
350 cx.spawn(|this, mut cx| async move {
351 let room = join.await?;
352 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
353 .await?;
354 this.update(&mut cx, |this, cx| {
355 this.report_call_event("join channel", cx)
356 })?;
357 Ok(room)
358 })
359 }
360
361 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
362 cx.notify();
363 self.report_call_event("hang up", cx);
364
365 Audio::end_call(cx);
366 if let Some((room, _)) = self.room.take() {
367 room.update(cx, |room, cx| room.leave(cx))
368 } else {
369 Task::ready(Ok(()))
370 }
371 }
372
373 pub fn share_project(
374 &mut self,
375 project: Model<Project>,
376 cx: &mut ModelContext<Self>,
377 ) -> Task<Result<u64>> {
378 if let Some((room, _)) = self.room.as_ref() {
379 self.report_call_event("share project", cx);
380 room.update(cx, |room, cx| room.share_project(project, cx))
381 } else {
382 Task::ready(Err(anyhow!("no active call")))
383 }
384 }
385
386 pub fn unshare_project(
387 &mut self,
388 project: Model<Project>,
389 cx: &mut ModelContext<Self>,
390 ) -> Result<()> {
391 if let Some((room, _)) = self.room.as_ref() {
392 self.report_call_event("unshare project", cx);
393 room.update(cx, |room, cx| room.unshare_project(project, cx))
394 } else {
395 Err(anyhow!("no active call"))
396 }
397 }
398
399 pub fn location(&self) -> Option<&WeakModel<Project>> {
400 self.location.as_ref()
401 }
402
403 pub fn set_location(
404 &mut self,
405 project: Option<&Model<Project>>,
406 cx: &mut ModelContext<Self>,
407 ) -> Task<Result<()>> {
408 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
409 self.location = project.map(|project| project.downgrade());
410 if let Some((room, _)) = self.room.as_ref() {
411 return room.update(cx, |room, cx| room.set_location(project, cx));
412 }
413 }
414 Task::ready(Ok(()))
415 }
416
417 fn set_room(
418 &mut self,
419 room: Option<Model<Room>>,
420 cx: &mut ModelContext<Self>,
421 ) -> Task<Result<()>> {
422 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
423 cx.notify();
424 if let Some(room) = room {
425 if room.read(cx).status().is_offline() {
426 self.room = None;
427 Task::ready(Ok(()))
428 } else {
429 let subscriptions = vec![
430 cx.observe(&room, |this, room, cx| {
431 if room.read(cx).status().is_offline() {
432 this.set_room(None, cx).detach_and_log_err(cx);
433 }
434
435 cx.notify();
436 }),
437 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
438 ];
439 self.room = Some((room.clone(), subscriptions));
440 let location = self
441 .location
442 .as_ref()
443 .and_then(|location| location.upgrade());
444 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
445 }
446 } else {
447 self.room = None;
448 Task::ready(Ok(()))
449 }
450 } else {
451 Task::ready(Ok(()))
452 }
453 }
454
455 pub fn room(&self) -> Option<&Model<Room>> {
456 self.room.as_ref().map(|(room, _)| room)
457 }
458
459 pub fn client(&self) -> Arc<Client> {
460 self.client.clone()
461 }
462
463 pub fn pending_invites(&self) -> &HashSet<u64> {
464 &self.pending_invites
465 }
466
467 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
468 if let Some(room) = self.room() {
469 let room = room.read(cx);
470 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
471 }
472 }
473}
474
475pub fn report_call_event_for_room(
476 operation: &'static str,
477 room_id: u64,
478 channel_id: Option<u64>,
479 client: &Arc<Client>,
480 cx: &AppContext,
481) {
482 let telemetry = client.telemetry();
483 let telemetry_settings = *TelemetrySettings::get_global(cx);
484
485 telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
486}
487
488pub fn report_call_event_for_channel(
489 operation: &'static str,
490 channel_id: u64,
491 client: &Arc<Client>,
492 cx: &AppContext,
493) {
494 let room = ActiveCall::global(cx).read(cx).room();
495
496 let telemetry = client.telemetry();
497
498 let telemetry_settings = *TelemetrySettings::get_global(cx);
499
500 telemetry.report_call_event(
501 telemetry_settings,
502 operation,
503 room.map(|r| r.read(cx).id()),
504 Some(channel_id),
505 )
506}
507
508struct Call {
509 follower_states: HashMap<View<Pane>, FollowerState>,
510 active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
511 parent_workspace: WeakView<Workspace>,
512}
513
514impl Call {
515 fn new(parent_workspace: WeakView<Workspace>, cx: &mut ViewContext<'_, Workspace>) -> Self {
516 let mut active_call = None;
517 if cx.has_global::<Model<ActiveCall>>() {
518 let call = cx.global::<Model<ActiveCall>>().clone();
519 let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
520 active_call = Some((call, subscriptions));
521 }
522 Self {
523 follower_states: Default::default(),
524 active_call,
525 parent_workspace,
526 }
527 }
528 fn on_active_call_event(
529 workspace: &mut Workspace,
530 _: Model<ActiveCall>,
531 event: &call2::room::Event,
532 cx: &mut ViewContext<Workspace>,
533 ) {
534 match event {
535 call2::room::Event::ParticipantLocationChanged { participant_id }
536 | call2::room::Event::RemoteVideoTracksChanged { participant_id } => {
537 workspace.leader_updated(*participant_id, cx);
538 }
539 _ => {}
540 }
541 }
542}
543
544#[async_trait(?Send)]
545impl CallHandler for Call {
546 fn leader_updated(&mut self, leader_id: PeerId, cx: &mut ViewContext<Workspace>) -> Option<()> {
547 cx.notify();
548
549 let (call, _) = self.active_call.as_ref()?;
550 let room = call.read(cx).room()?.read(cx);
551 let participant = room.remote_participant_for_peer_id(leader_id)?;
552 let mut items_to_activate = Vec::new();
553
554 let leader_in_this_app;
555 let leader_in_this_project;
556 match participant.location {
557 call2::ParticipantLocation::SharedProject { project_id } => {
558 leader_in_this_app = true;
559 leader_in_this_project = Some(project_id)
560 == self
561 .parent_workspace
562 .update(cx, |this, cx| this.project.read(cx).remote_id())
563 .log_err()
564 .flatten();
565 }
566 call2::ParticipantLocation::UnsharedProject => {
567 leader_in_this_app = true;
568 leader_in_this_project = false;
569 }
570 call2::ParticipantLocation::External => {
571 leader_in_this_app = false;
572 leader_in_this_project = false;
573 }
574 };
575
576 for (pane, state) in &self.follower_states {
577 if state.leader_id != leader_id {
578 continue;
579 }
580 if let (Some(active_view_id), true) = (state.active_view_id, leader_in_this_app) {
581 if let Some(item) = state.items_by_leader_view_id.get(&active_view_id) {
582 if leader_in_this_project || !item.is_project_item(cx) {
583 items_to_activate.push((pane.clone(), item.boxed_clone()));
584 }
585 } else {
586 log::warn!(
587 "unknown view id {:?} for leader {:?}",
588 active_view_id,
589 leader_id
590 );
591 }
592 continue;
593 }
594 // todo!()
595 // if let Some(shared_screen) = self.shared_screen_for_peer(leader_id, pane, cx) {
596 // items_to_activate.push((pane.clone(), Box::new(shared_screen)));
597 // }
598 }
599
600 for (pane, item) in items_to_activate {
601 let pane_was_focused = pane.read(cx).has_focus(cx);
602 if let Some(index) = pane.update(cx, |pane, _| pane.index_for_item(item.as_ref())) {
603 pane.update(cx, |pane, cx| pane.activate_item(index, false, false, cx));
604 } else {
605 pane.update(cx, |pane, mut cx| {
606 pane.add_item(item.boxed_clone(), false, false, None, &mut cx)
607 });
608 }
609
610 if pane_was_focused {
611 pane.update(cx, |pane, cx| pane.focus_active_item(cx));
612 }
613 }
614
615 None
616 }
617
618 fn shared_screen_for_peer(
619 &self,
620 peer_id: PeerId,
621 pane: &View<Pane>,
622 cx: &mut ViewContext<Workspace>,
623 ) -> Option<Box<dyn ItemHandle>> {
624 let (call, _) = self.active_call.as_ref()?;
625 let room = call.read(cx).room()?.read(cx);
626 let participant = room.remote_participant_for_peer_id(peer_id)?;
627 let track = participant.video_tracks.values().next()?.clone();
628 let user = participant.user.clone();
629 todo!();
630 // for item in pane.read(cx).items_of_type::<SharedScreen>() {
631 // if item.read(cx).peer_id == peer_id {
632 // return Box::new(Some(item));
633 // }
634 // }
635
636 // Some(Box::new(cx.build_view(|cx| {
637 // SharedScreen::new(&track, peer_id, user.clone(), cx)
638 // })))
639 }
640
641 fn follower_states_mut(&mut self) -> &mut HashMap<View<Pane>, FollowerState> {
642 &mut self.follower_states
643 }
644 fn follower_states(&self) -> &HashMap<View<Pane>, FollowerState> {
645 &self.follower_states
646 }
647 fn room_id(&self, cx: &AppContext) -> Option<u64> {
648 Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
649 }
650 fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
651 let Some((call, _)) = self.active_call.as_ref() else {
652 bail!("Cannot exit a call; not in a call");
653 };
654
655 call.update(&mut cx, |this, cx| this.hang_up(cx))
656 }
657 fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
658 ActiveCall::global(cx).read(cx).location().cloned()
659 }
660}
661
662#[cfg(test)]
663mod test {
664 use gpui::TestAppContext;
665
666 use crate::OneAtATime;
667
668 #[gpui::test]
669 async fn test_one_at_a_time(cx: &mut TestAppContext) {
670 let mut one_at_a_time = OneAtATime { cancel: None };
671
672 assert_eq!(
673 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
674 .await
675 .unwrap(),
676 Some(1)
677 );
678
679 let (a, b) = cx.update(|cx| {
680 (
681 one_at_a_time.spawn(cx, |_| async {
682 assert!(false);
683 Ok(2)
684 }),
685 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
686 )
687 });
688
689 assert_eq!(a.await.unwrap(), None);
690 assert_eq!(b.await.unwrap(), Some(3));
691
692 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
693 drop(one_at_a_time);
694
695 assert_eq!(promise.await.unwrap(), None);
696 }
697}