1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, bail, Result};
6use async_trait::async_trait;
7use audio::Audio;
8use call_settings::CallSettings;
9use client::{
10 proto::{self, PeerId},
11 Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
12};
13use collections::HashSet;
14use futures::{channel::oneshot, future::Shared, Future, FutureExt};
15use gpui::{
16 AppContext, AsyncAppContext, AsyncWindowContext, Context, EventEmitter, Model, ModelContext,
17 Subscription, Task, View, ViewContext, WeakModel, WeakView,
18};
19pub use participant::ParticipantLocation;
20use participant::RemoteParticipant;
21use postage::watch;
22use project::Project;
23use room::Event;
24pub use room::Room;
25use settings::Settings;
26use std::sync::Arc;
27use util::ResultExt;
28use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
29
30pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
31 CallSettings::register(cx);
32
33 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
34 cx.set_global(active_call);
35}
36
37pub struct OneAtATime {
38 cancel: Option<oneshot::Sender<()>>,
39}
40
41impl OneAtATime {
42 /// spawn a task in the given context.
43 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
44 /// otherwise you'll see the result of the task.
45 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
46 where
47 F: 'static + FnOnce(AsyncAppContext) -> Fut,
48 Fut: Future<Output = Result<R>>,
49 R: 'static,
50 {
51 let (tx, rx) = oneshot::channel();
52 self.cancel.replace(tx);
53 cx.spawn(|cx| async move {
54 futures::select_biased! {
55 _ = rx.fuse() => Ok(None),
56 result = f(cx).fuse() => result.map(Some),
57 }
58 })
59 }
60
61 fn running(&self) -> bool {
62 self.cancel
63 .as_ref()
64 .is_some_and(|cancel| !cancel.is_canceled())
65 }
66}
67
68#[derive(Clone)]
69pub struct IncomingCall {
70 pub room_id: u64,
71 pub calling_user: Arc<User>,
72 pub participants: Vec<Arc<User>>,
73 pub initial_project: Option<proto::ParticipantProject>,
74}
75
76/// Singleton global maintaining the user's participation in a room across workspaces.
77pub struct ActiveCall {
78 room: Option<(Model<Room>, Vec<Subscription>)>,
79 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
80 location: Option<WeakModel<Project>>,
81 _join_debouncer: OneAtATime,
82 pending_invites: HashSet<u64>,
83 incoming_call: (
84 watch::Sender<Option<IncomingCall>>,
85 watch::Receiver<Option<IncomingCall>>,
86 ),
87 client: Arc<Client>,
88 user_store: Model<UserStore>,
89 _subscriptions: Vec<client::Subscription>,
90}
91
92impl EventEmitter<Event> for ActiveCall {}
93
94impl ActiveCall {
95 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
96 Self {
97 room: None,
98 pending_room_creation: None,
99 location: None,
100 pending_invites: Default::default(),
101 incoming_call: watch::channel(),
102 _join_debouncer: OneAtATime { cancel: None },
103 _subscriptions: vec![
104 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
105 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
106 ],
107 client,
108 user_store,
109 }
110 }
111
112 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
113 self.room()?.read(cx).channel_id()
114 }
115
116 async fn handle_incoming_call(
117 this: Model<Self>,
118 envelope: TypedEnvelope<proto::IncomingCall>,
119 _: Arc<Client>,
120 mut cx: AsyncAppContext,
121 ) -> Result<proto::Ack> {
122 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
123 let call = IncomingCall {
124 room_id: envelope.payload.room_id,
125 participants: user_store
126 .update(&mut cx, |user_store, cx| {
127 user_store.get_users(envelope.payload.participant_user_ids, cx)
128 })?
129 .await?,
130 calling_user: user_store
131 .update(&mut cx, |user_store, cx| {
132 user_store.get_user(envelope.payload.calling_user_id, cx)
133 })?
134 .await?,
135 initial_project: envelope.payload.initial_project,
136 };
137 this.update(&mut cx, |this, _| {
138 *this.incoming_call.0.borrow_mut() = Some(call);
139 })?;
140
141 Ok(proto::Ack {})
142 }
143
144 async fn handle_call_canceled(
145 this: Model<Self>,
146 envelope: TypedEnvelope<proto::CallCanceled>,
147 _: Arc<Client>,
148 mut cx: AsyncAppContext,
149 ) -> Result<()> {
150 this.update(&mut cx, |this, _| {
151 let mut incoming_call = this.incoming_call.0.borrow_mut();
152 if incoming_call
153 .as_ref()
154 .map_or(false, |call| call.room_id == envelope.payload.room_id)
155 {
156 incoming_call.take();
157 }
158 })?;
159 Ok(())
160 }
161
162 pub fn global(cx: &AppContext) -> Model<Self> {
163 cx.global::<Model<Self>>().clone()
164 }
165
166 pub fn invite(
167 &mut self,
168 called_user_id: u64,
169 initial_project: Option<Model<Project>>,
170 cx: &mut ModelContext<Self>,
171 ) -> Task<Result<()>> {
172 if !self.pending_invites.insert(called_user_id) {
173 return Task::ready(Err(anyhow!("user was already invited")));
174 }
175 cx.notify();
176
177 if self._join_debouncer.running() {
178 return Task::ready(Ok(()));
179 }
180
181 let room = if let Some(room) = self.room().cloned() {
182 Some(Task::ready(Ok(room)).shared())
183 } else {
184 self.pending_room_creation.clone()
185 };
186
187 let invite = if let Some(room) = room {
188 cx.spawn(move |_, mut cx| async move {
189 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
190
191 let initial_project_id = if let Some(initial_project) = initial_project {
192 Some(
193 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
194 .await?,
195 )
196 } else {
197 None
198 };
199
200 room.update(&mut cx, move |room, cx| {
201 room.call(called_user_id, initial_project_id, cx)
202 })?
203 .await?;
204
205 anyhow::Ok(())
206 })
207 } else {
208 let client = self.client.clone();
209 let user_store = self.user_store.clone();
210 let room = cx
211 .spawn(move |this, mut cx| async move {
212 let create_room = async {
213 let room = cx
214 .update(|cx| {
215 Room::create(
216 called_user_id,
217 initial_project,
218 client,
219 user_store,
220 cx,
221 )
222 })?
223 .await?;
224
225 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
226 .await?;
227
228 anyhow::Ok(room)
229 };
230
231 let room = create_room.await;
232 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
233 room.map_err(Arc::new)
234 })
235 .shared();
236 self.pending_room_creation = Some(room.clone());
237 cx.background_executor().spawn(async move {
238 room.await.map_err(|err| anyhow!("{:?}", err))?;
239 anyhow::Ok(())
240 })
241 };
242
243 cx.spawn(move |this, mut cx| async move {
244 let result = invite.await;
245 if result.is_ok() {
246 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
247 } else {
248 // TODO: Resport collaboration error
249 }
250
251 this.update(&mut cx, |this, cx| {
252 this.pending_invites.remove(&called_user_id);
253 cx.notify();
254 })?;
255 result
256 })
257 }
258
259 pub fn cancel_invite(
260 &mut self,
261 called_user_id: u64,
262 cx: &mut ModelContext<Self>,
263 ) -> Task<Result<()>> {
264 let room_id = if let Some(room) = self.room() {
265 room.read(cx).id()
266 } else {
267 return Task::ready(Err(anyhow!("no active call")));
268 };
269
270 let client = self.client.clone();
271 cx.background_executor().spawn(async move {
272 client
273 .request(proto::CancelCall {
274 room_id,
275 called_user_id,
276 })
277 .await?;
278 anyhow::Ok(())
279 })
280 }
281
282 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
283 self.incoming_call.1.clone()
284 }
285
286 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
287 if self.room.is_some() {
288 return Task::ready(Err(anyhow!("cannot join while on another call")));
289 }
290
291 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
292 call
293 } else {
294 return Task::ready(Err(anyhow!("no incoming call")));
295 };
296
297 if self.pending_room_creation.is_some() {
298 return Task::ready(Ok(()));
299 }
300
301 let room_id = call.room_id.clone();
302 let client = self.client.clone();
303 let user_store = self.user_store.clone();
304 let join = self
305 ._join_debouncer
306 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
307
308 cx.spawn(|this, mut cx| async move {
309 let room = join.await?;
310 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
311 .await?;
312 this.update(&mut cx, |this, cx| {
313 this.report_call_event("accept incoming", cx)
314 })?;
315 Ok(())
316 })
317 }
318
319 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
320 let call = self
321 .incoming_call
322 .0
323 .borrow_mut()
324 .take()
325 .ok_or_else(|| anyhow!("no incoming call"))?;
326 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
327 self.client.send(proto::DeclineCall {
328 room_id: call.room_id,
329 })?;
330 Ok(())
331 }
332
333 pub fn join_channel(
334 &mut self,
335 channel_id: u64,
336 cx: &mut ModelContext<Self>,
337 ) -> Task<Result<Option<Model<Room>>>> {
338 if let Some(room) = self.room().cloned() {
339 if room.read(cx).channel_id() == Some(channel_id) {
340 return Task::ready(Ok(Some(room)));
341 } else {
342 room.update(cx, |room, cx| room.clear_state(cx));
343 }
344 }
345
346 if self.pending_room_creation.is_some() {
347 return Task::ready(Ok(None));
348 }
349
350 let client = self.client.clone();
351 let user_store = self.user_store.clone();
352 let join = self._join_debouncer.spawn(cx, move |cx| async move {
353 Room::join_channel(channel_id, client, user_store, cx).await
354 });
355
356 cx.spawn(|this, mut cx| async move {
357 let room = join.await?;
358 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
359 .await?;
360 this.update(&mut cx, |this, cx| {
361 this.report_call_event("join channel", cx)
362 })?;
363 Ok(room)
364 })
365 }
366
367 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
368 cx.notify();
369 self.report_call_event("hang up", cx);
370
371 Audio::end_call(cx);
372 if let Some((room, _)) = self.room.take() {
373 room.update(cx, |room, cx| room.leave(cx))
374 } else {
375 Task::ready(Ok(()))
376 }
377 }
378
379 pub fn share_project(
380 &mut self,
381 project: Model<Project>,
382 cx: &mut ModelContext<Self>,
383 ) -> Task<Result<u64>> {
384 if let Some((room, _)) = self.room.as_ref() {
385 self.report_call_event("share project", cx);
386 room.update(cx, |room, cx| room.share_project(project, cx))
387 } else {
388 Task::ready(Err(anyhow!("no active call")))
389 }
390 }
391
392 pub fn unshare_project(
393 &mut self,
394 project: Model<Project>,
395 cx: &mut ModelContext<Self>,
396 ) -> Result<()> {
397 if let Some((room, _)) = self.room.as_ref() {
398 self.report_call_event("unshare project", cx);
399 room.update(cx, |room, cx| room.unshare_project(project, cx))
400 } else {
401 Err(anyhow!("no active call"))
402 }
403 }
404
405 pub fn location(&self) -> Option<&WeakModel<Project>> {
406 self.location.as_ref()
407 }
408
409 pub fn set_location(
410 &mut self,
411 project: Option<&Model<Project>>,
412 cx: &mut ModelContext<Self>,
413 ) -> Task<Result<()>> {
414 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
415 self.location = project.map(|project| project.downgrade());
416 if let Some((room, _)) = self.room.as_ref() {
417 return room.update(cx, |room, cx| room.set_location(project, cx));
418 }
419 }
420 Task::ready(Ok(()))
421 }
422
423 fn set_room(
424 &mut self,
425 room: Option<Model<Room>>,
426 cx: &mut ModelContext<Self>,
427 ) -> Task<Result<()>> {
428 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
429 cx.notify();
430 if let Some(room) = room {
431 if room.read(cx).status().is_offline() {
432 self.room = None;
433 Task::ready(Ok(()))
434 } else {
435 let subscriptions = vec![
436 cx.observe(&room, |this, room, cx| {
437 if room.read(cx).status().is_offline() {
438 this.set_room(None, cx).detach_and_log_err(cx);
439 }
440
441 cx.notify();
442 }),
443 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
444 ];
445 self.room = Some((room.clone(), subscriptions));
446 let location = self
447 .location
448 .as_ref()
449 .and_then(|location| location.upgrade());
450 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
451 }
452 } else {
453 self.room = None;
454 Task::ready(Ok(()))
455 }
456 } else {
457 Task::ready(Ok(()))
458 }
459 }
460
461 pub fn room(&self) -> Option<&Model<Room>> {
462 self.room.as_ref().map(|(room, _)| room)
463 }
464
465 pub fn client(&self) -> Arc<Client> {
466 self.client.clone()
467 }
468
469 pub fn pending_invites(&self) -> &HashSet<u64> {
470 &self.pending_invites
471 }
472
473 pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
474 if let Some(room) = self.room() {
475 let room = room.read(cx);
476 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
477 }
478 }
479}
480
481pub fn report_call_event_for_room(
482 operation: &'static str,
483 room_id: u64,
484 channel_id: Option<u64>,
485 client: &Arc<Client>,
486 cx: &mut AppContext,
487) {
488 let telemetry = client.telemetry();
489 let telemetry_settings = *TelemetrySettings::get_global(cx);
490
491 telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
492}
493
494pub fn report_call_event_for_channel(
495 operation: &'static str,
496 channel_id: u64,
497 client: &Arc<Client>,
498 cx: &AppContext,
499) {
500 let room = ActiveCall::global(cx).read(cx).room();
501
502 let telemetry = client.telemetry();
503
504 let telemetry_settings = *TelemetrySettings::get_global(cx);
505
506 telemetry.report_call_event(
507 telemetry_settings,
508 operation,
509 room.map(|r| r.read(cx).id()),
510 Some(channel_id),
511 )
512}
513
514pub struct Call {
515 active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
516 parent_workspace: WeakView<Workspace>,
517}
518
519impl Call {
520 pub fn new(
521 parent_workspace: WeakView<Workspace>,
522 cx: &mut ViewContext<'_, Workspace>,
523 ) -> Box<dyn CallHandler> {
524 let mut active_call = None;
525 if cx.has_global::<Model<ActiveCall>>() {
526 let call = cx.global::<Model<ActiveCall>>().clone();
527 let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
528 active_call = Some((call, subscriptions));
529 }
530 Box::new(Self {
531 active_call,
532 parent_workspace,
533 })
534 }
535 fn on_active_call_event(
536 workspace: &mut Workspace,
537 _: Model<ActiveCall>,
538 event: &room::Event,
539 cx: &mut ViewContext<Workspace>,
540 ) {
541 match event {
542 room::Event::ParticipantLocationChanged { participant_id }
543 | room::Event::RemoteVideoTracksChanged { participant_id } => {
544 workspace.leader_updated(*participant_id, cx);
545 }
546 _ => {}
547 }
548 }
549}
550
551#[async_trait(?Send)]
552impl CallHandler for Call {
553 fn peer_state(
554 &mut self,
555 leader_id: PeerId,
556 cx: &mut ViewContext<Workspace>,
557 ) -> Option<(bool, bool)> {
558 let (call, _) = self.active_call.as_ref()?;
559 let room = call.read(cx).room()?.read(cx);
560 let participant = room.remote_participant_for_peer_id(leader_id)?;
561
562 let leader_in_this_app;
563 let leader_in_this_project;
564 match participant.location {
565 ParticipantLocation::SharedProject { project_id } => {
566 leader_in_this_app = true;
567 leader_in_this_project = Some(project_id)
568 == self
569 .parent_workspace
570 .update(cx, |this, cx| this.project().read(cx).remote_id())
571 .log_err()
572 .flatten();
573 }
574 ParticipantLocation::UnsharedProject => {
575 leader_in_this_app = true;
576 leader_in_this_project = false;
577 }
578 ParticipantLocation::External => {
579 leader_in_this_app = false;
580 leader_in_this_project = false;
581 }
582 };
583
584 Some((leader_in_this_project, leader_in_this_app))
585 }
586
587 fn shared_screen_for_peer(
588 &self,
589 peer_id: PeerId,
590 _pane: &View<Pane>,
591 cx: &mut ViewContext<Workspace>,
592 ) -> Option<Box<dyn ItemHandle>> {
593 let (call, _) = self.active_call.as_ref()?;
594 let room = call.read(cx).room()?.read(cx);
595 let participant = room.remote_participant_for_peer_id(peer_id)?;
596 let _track = participant.video_tracks.values().next()?.clone();
597 let _user = participant.user.clone();
598 todo!();
599 // for item in pane.read(cx).items_of_type::<SharedScreen>() {
600 // if item.read(cx).peer_id == peer_id {
601 // return Box::new(Some(item));
602 // }
603 // }
604
605 // Some(Box::new(cx.build_view(|cx| {
606 // SharedScreen::new(&track, peer_id, user.clone(), cx)
607 // })))
608 }
609 fn room_id(&self, cx: &AppContext) -> Option<u64> {
610 Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
611 }
612 fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
613 let Some((call, _)) = self.active_call.as_ref() else {
614 bail!("Cannot exit a call; not in a call");
615 };
616
617 call.update(&mut cx, |this, cx| this.hang_up(cx))
618 }
619 fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
620 ActiveCall::global(cx).read(cx).location().cloned()
621 }
622 fn invite(
623 &mut self,
624 called_user_id: u64,
625 initial_project: Option<Model<Project>>,
626 cx: &mut AppContext,
627 ) -> Task<Result<()>> {
628 ActiveCall::global(cx).update(cx, |this, cx| {
629 this.invite(called_user_id, initial_project, cx)
630 })
631 }
632 fn remote_participants(&self, cx: &AppContext) -> Option<Vec<Arc<User>>> {
633 self.active_call
634 .as_ref()
635 .map(|call| {
636 call.0.read(cx).room().map(|room| {
637 room.read(cx)
638 .remote_participants()
639 .iter()
640 .map(|participant| participant.1.user.clone())
641 .collect()
642 })
643 })
644 .flatten()
645 }
646 fn is_muted(&self, cx: &AppContext) -> Option<bool> {
647 self.active_call
648 .as_ref()
649 .map(|call| {
650 call.0
651 .read(cx)
652 .room()
653 .map(|room| room.read(cx).is_muted(cx))
654 })
655 .flatten()
656 }
657 fn toggle_mute(&self, cx: &mut AppContext) {
658 self.active_call.as_ref().map(|call| {
659 call.0.update(cx, |this, cx| {
660 this.room().map(|room| {
661 room.update(cx, |this, cx| {
662 this.toggle_mute(cx);
663 })
664 })
665 })
666 });
667 }
668}
669
670#[cfg(test)]
671mod test {
672 use gpui::TestAppContext;
673
674 use crate::OneAtATime;
675
676 #[gpui::test]
677 async fn test_one_at_a_time(cx: &mut TestAppContext) {
678 let mut one_at_a_time = OneAtATime { cancel: None };
679
680 assert_eq!(
681 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
682 .await
683 .unwrap(),
684 Some(1)
685 );
686
687 let (a, b) = cx.update(|cx| {
688 (
689 one_at_a_time.spawn(cx, |_| async {
690 assert!(false);
691 Ok(2)
692 }),
693 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
694 )
695 });
696
697 assert_eq!(a.await.unwrap(), None);
698 assert_eq!(b.await.unwrap(), Some(3));
699
700 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
701 drop(one_at_a_time);
702
703 assert_eq!(promise.await.unwrap(), None);
704 }
705}