1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, bail, Result};
6use async_trait::async_trait;
7use audio::Audio;
8use call_settings::CallSettings;
9use client::{
10 proto::{self, PeerId},
11 Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
12};
13use collections::HashSet;
14use futures::{channel::oneshot, future::Shared, Future, FutureExt};
15use gpui::{
16 AppContext, AsyncAppContext, AsyncWindowContext, Context, EventEmitter, Model, ModelContext,
17 Subscription, Task, View, ViewContext, WeakModel, WeakView,
18};
19pub use participant::ParticipantLocation;
20use postage::watch;
21use project::Project;
22use room::Event;
23pub use room::Room;
24use settings::Settings;
25use std::sync::Arc;
26use util::ResultExt;
27use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
28
29pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
30 CallSettings::register(cx);
31
32 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
33 cx.set_global(active_call);
34}
35
36pub struct OneAtATime {
37 cancel: Option<oneshot::Sender<()>>,
38}
39
40impl OneAtATime {
41 /// spawn a task in the given context.
42 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
43 /// otherwise you'll see the result of the task.
44 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
45 where
46 F: 'static + FnOnce(AsyncAppContext) -> Fut,
47 Fut: Future<Output = Result<R>>,
48 R: 'static,
49 {
50 let (tx, rx) = oneshot::channel();
51 self.cancel.replace(tx);
52 cx.spawn(|cx| async move {
53 futures::select_biased! {
54 _ = rx.fuse() => Ok(None),
55 result = f(cx).fuse() => result.map(Some),
56 }
57 })
58 }
59
60 fn running(&self) -> bool {
61 self.cancel
62 .as_ref()
63 .is_some_and(|cancel| !cancel.is_canceled())
64 }
65}
66
67#[derive(Clone)]
68pub struct IncomingCall {
69 pub room_id: u64,
70 pub calling_user: Arc<User>,
71 pub participants: Vec<Arc<User>>,
72 pub initial_project: Option<proto::ParticipantProject>,
73}
74
75/// Singleton global maintaining the user's participation in a room across workspaces.
76pub struct ActiveCall {
77 room: Option<(Model<Room>, Vec<Subscription>)>,
78 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
79 location: Option<WeakModel<Project>>,
80 _join_debouncer: OneAtATime,
81 pending_invites: HashSet<u64>,
82 incoming_call: (
83 watch::Sender<Option<IncomingCall>>,
84 watch::Receiver<Option<IncomingCall>>,
85 ),
86 client: Arc<Client>,
87 user_store: Model<UserStore>,
88 _subscriptions: Vec<client::Subscription>,
89}
90
91impl EventEmitter<Event> for ActiveCall {}
92
93impl ActiveCall {
94 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
95 Self {
96 room: None,
97 pending_room_creation: None,
98 location: None,
99 pending_invites: Default::default(),
100 incoming_call: watch::channel(),
101 _join_debouncer: OneAtATime { cancel: None },
102 _subscriptions: vec![
103 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
104 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
105 ],
106 client,
107 user_store,
108 }
109 }
110
111 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
112 self.room()?.read(cx).channel_id()
113 }
114
115 async fn handle_incoming_call(
116 this: Model<Self>,
117 envelope: TypedEnvelope<proto::IncomingCall>,
118 _: Arc<Client>,
119 mut cx: AsyncAppContext,
120 ) -> Result<proto::Ack> {
121 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
122 let call = IncomingCall {
123 room_id: envelope.payload.room_id,
124 participants: user_store
125 .update(&mut cx, |user_store, cx| {
126 user_store.get_users(envelope.payload.participant_user_ids, cx)
127 })?
128 .await?,
129 calling_user: user_store
130 .update(&mut cx, |user_store, cx| {
131 user_store.get_user(envelope.payload.calling_user_id, cx)
132 })?
133 .await?,
134 initial_project: envelope.payload.initial_project,
135 };
136 this.update(&mut cx, |this, _| {
137 *this.incoming_call.0.borrow_mut() = Some(call);
138 })?;
139
140 Ok(proto::Ack {})
141 }
142
143 async fn handle_call_canceled(
144 this: Model<Self>,
145 envelope: TypedEnvelope<proto::CallCanceled>,
146 _: Arc<Client>,
147 mut cx: AsyncAppContext,
148 ) -> Result<()> {
149 this.update(&mut cx, |this, _| {
150 let mut incoming_call = this.incoming_call.0.borrow_mut();
151 if incoming_call
152 .as_ref()
153 .map_or(false, |call| call.room_id == envelope.payload.room_id)
154 {
155 incoming_call.take();
156 }
157 })?;
158 Ok(())
159 }
160
161 pub fn global(cx: &AppContext) -> Model<Self> {
162 cx.global::<Model<Self>>().clone()
163 }
164
165 pub fn invite(
166 &mut self,
167 called_user_id: u64,
168 initial_project: Option<Model<Project>>,
169 cx: &mut ModelContext<Self>,
170 ) -> Task<Result<()>> {
171 if !self.pending_invites.insert(called_user_id) {
172 return Task::ready(Err(anyhow!("user was already invited")));
173 }
174 cx.notify();
175
176 if self._join_debouncer.running() {
177 return Task::ready(Ok(()));
178 }
179
180 let room = if let Some(room) = self.room().cloned() {
181 Some(Task::ready(Ok(room)).shared())
182 } else {
183 self.pending_room_creation.clone()
184 };
185
186 let invite = if let Some(room) = room {
187 cx.spawn(move |_, mut cx| async move {
188 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
189
190 let initial_project_id = if let Some(initial_project) = initial_project {
191 Some(
192 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
193 .await?,
194 )
195 } else {
196 None
197 };
198
199 room.update(&mut cx, move |room, cx| {
200 room.call(called_user_id, initial_project_id, cx)
201 })?
202 .await?;
203
204 anyhow::Ok(())
205 })
206 } else {
207 let client = self.client.clone();
208 let user_store = self.user_store.clone();
209 let room = cx
210 .spawn(move |this, mut cx| async move {
211 let create_room = async {
212 let room = cx
213 .update(|cx| {
214 Room::create(
215 called_user_id,
216 initial_project,
217 client,
218 user_store,
219 cx,
220 )
221 })?
222 .await?;
223
224 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
225 .await?;
226
227 anyhow::Ok(room)
228 };
229
230 let room = create_room.await;
231 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
232 room.map_err(Arc::new)
233 })
234 .shared();
235 self.pending_room_creation = Some(room.clone());
236 cx.background_executor().spawn(async move {
237 room.await.map_err(|err| anyhow!("{:?}", err))?;
238 anyhow::Ok(())
239 })
240 };
241
242 cx.spawn(move |this, mut cx| async move {
243 let result = invite.await;
244 if result.is_ok() {
245 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
246 } else {
247 // TODO: Resport collaboration error
248 }
249
250 this.update(&mut cx, |this, cx| {
251 this.pending_invites.remove(&called_user_id);
252 cx.notify();
253 })?;
254 result
255 })
256 }
257
258 pub fn cancel_invite(
259 &mut self,
260 called_user_id: u64,
261 cx: &mut ModelContext<Self>,
262 ) -> Task<Result<()>> {
263 let room_id = if let Some(room) = self.room() {
264 room.read(cx).id()
265 } else {
266 return Task::ready(Err(anyhow!("no active call")));
267 };
268
269 let client = self.client.clone();
270 cx.background_executor().spawn(async move {
271 client
272 .request(proto::CancelCall {
273 room_id,
274 called_user_id,
275 })
276 .await?;
277 anyhow::Ok(())
278 })
279 }
280
281 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
282 self.incoming_call.1.clone()
283 }
284
285 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286 if self.room.is_some() {
287 return Task::ready(Err(anyhow!("cannot join while on another call")));
288 }
289
290 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
291 call
292 } else {
293 return Task::ready(Err(anyhow!("no incoming call")));
294 };
295
296 if self.pending_room_creation.is_some() {
297 return Task::ready(Ok(()));
298 }
299
300 let room_id = call.room_id.clone();
301 let client = self.client.clone();
302 let user_store = self.user_store.clone();
303 let join = self
304 ._join_debouncer
305 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
306
307 cx.spawn(|this, mut cx| async move {
308 let room = join.await?;
309 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
310 .await?;
311 this.update(&mut cx, |this, cx| {
312 this.report_call_event("accept incoming", cx)
313 })?;
314 Ok(())
315 })
316 }
317
318 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
319 let call = self
320 .incoming_call
321 .0
322 .borrow_mut()
323 .take()
324 .ok_or_else(|| anyhow!("no incoming call"))?;
325 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
326 self.client.send(proto::DeclineCall {
327 room_id: call.room_id,
328 })?;
329 Ok(())
330 }
331
332 pub fn join_channel(
333 &mut self,
334 channel_id: u64,
335 cx: &mut ModelContext<Self>,
336 ) -> Task<Result<Option<Model<Room>>>> {
337 if let Some(room) = self.room().cloned() {
338 if room.read(cx).channel_id() == Some(channel_id) {
339 return Task::ready(Ok(Some(room)));
340 } else {
341 room.update(cx, |room, cx| room.clear_state(cx));
342 }
343 }
344
345 if self.pending_room_creation.is_some() {
346 return Task::ready(Ok(None));
347 }
348
349 let client = self.client.clone();
350 let user_store = self.user_store.clone();
351 let join = self._join_debouncer.spawn(cx, move |cx| async move {
352 Room::join_channel(channel_id, client, user_store, cx).await
353 });
354
355 cx.spawn(|this, mut cx| async move {
356 let room = join.await?;
357 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
358 .await?;
359 this.update(&mut cx, |this, cx| {
360 this.report_call_event("join channel", cx)
361 })?;
362 Ok(room)
363 })
364 }
365
366 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
367 cx.notify();
368 self.report_call_event("hang up", cx);
369
370 Audio::end_call(cx);
371 if let Some((room, _)) = self.room.take() {
372 room.update(cx, |room, cx| room.leave(cx))
373 } else {
374 Task::ready(Ok(()))
375 }
376 }
377
378 pub fn share_project(
379 &mut self,
380 project: Model<Project>,
381 cx: &mut ModelContext<Self>,
382 ) -> Task<Result<u64>> {
383 if let Some((room, _)) = self.room.as_ref() {
384 self.report_call_event("share project", cx);
385 room.update(cx, |room, cx| room.share_project(project, cx))
386 } else {
387 Task::ready(Err(anyhow!("no active call")))
388 }
389 }
390
391 pub fn unshare_project(
392 &mut self,
393 project: Model<Project>,
394 cx: &mut ModelContext<Self>,
395 ) -> Result<()> {
396 if let Some((room, _)) = self.room.as_ref() {
397 self.report_call_event("unshare project", cx);
398 room.update(cx, |room, cx| room.unshare_project(project, cx))
399 } else {
400 Err(anyhow!("no active call"))
401 }
402 }
403
404 pub fn location(&self) -> Option<&WeakModel<Project>> {
405 self.location.as_ref()
406 }
407
408 pub fn set_location(
409 &mut self,
410 project: Option<&Model<Project>>,
411 cx: &mut ModelContext<Self>,
412 ) -> Task<Result<()>> {
413 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
414 self.location = project.map(|project| project.downgrade());
415 if let Some((room, _)) = self.room.as_ref() {
416 return room.update(cx, |room, cx| room.set_location(project, cx));
417 }
418 }
419 Task::ready(Ok(()))
420 }
421
422 fn set_room(
423 &mut self,
424 room: Option<Model<Room>>,
425 cx: &mut ModelContext<Self>,
426 ) -> Task<Result<()>> {
427 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
428 cx.notify();
429 if let Some(room) = room {
430 if room.read(cx).status().is_offline() {
431 self.room = None;
432 Task::ready(Ok(()))
433 } else {
434 let subscriptions = vec![
435 cx.observe(&room, |this, room, cx| {
436 if room.read(cx).status().is_offline() {
437 this.set_room(None, cx).detach_and_log_err(cx);
438 }
439
440 cx.notify();
441 }),
442 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
443 ];
444 self.room = Some((room.clone(), subscriptions));
445 let location = self
446 .location
447 .as_ref()
448 .and_then(|location| location.upgrade());
449 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
450 }
451 } else {
452 self.room = None;
453 Task::ready(Ok(()))
454 }
455 } else {
456 Task::ready(Ok(()))
457 }
458 }
459
460 pub fn room(&self) -> Option<&Model<Room>> {
461 self.room.as_ref().map(|(room, _)| room)
462 }
463
464 pub fn client(&self) -> Arc<Client> {
465 self.client.clone()
466 }
467
468 pub fn pending_invites(&self) -> &HashSet<u64> {
469 &self.pending_invites
470 }
471
472 pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
473 if let Some(room) = self.room() {
474 let room = room.read(cx);
475 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
476 }
477 }
478}
479
480pub fn report_call_event_for_room(
481 operation: &'static str,
482 room_id: u64,
483 channel_id: Option<u64>,
484 client: &Arc<Client>,
485 cx: &mut AppContext,
486) {
487 let telemetry = client.telemetry();
488 let telemetry_settings = *TelemetrySettings::get_global(cx);
489
490 telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
491}
492
493pub fn report_call_event_for_channel(
494 operation: &'static str,
495 channel_id: u64,
496 client: &Arc<Client>,
497 cx: &AppContext,
498) {
499 let room = ActiveCall::global(cx).read(cx).room();
500
501 let telemetry = client.telemetry();
502
503 let telemetry_settings = *TelemetrySettings::get_global(cx);
504
505 telemetry.report_call_event(
506 telemetry_settings,
507 operation,
508 room.map(|r| r.read(cx).id()),
509 Some(channel_id),
510 )
511}
512
513pub struct Call {
514 active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
515 parent_workspace: WeakView<Workspace>,
516}
517
518impl Call {
519 pub fn new(
520 parent_workspace: WeakView<Workspace>,
521 cx: &mut ViewContext<'_, Workspace>,
522 ) -> Box<dyn CallHandler> {
523 let mut active_call = None;
524 if cx.has_global::<Model<ActiveCall>>() {
525 let call = cx.global::<Model<ActiveCall>>().clone();
526 let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
527 active_call = Some((call, subscriptions));
528 }
529 Box::new(Self {
530 active_call,
531 parent_workspace,
532 })
533 }
534 fn on_active_call_event(
535 workspace: &mut Workspace,
536 _: Model<ActiveCall>,
537 event: &room::Event,
538 cx: &mut ViewContext<Workspace>,
539 ) {
540 match event {
541 room::Event::ParticipantLocationChanged { participant_id }
542 | room::Event::RemoteVideoTracksChanged { participant_id } => {
543 workspace.leader_updated(*participant_id, cx);
544 }
545 _ => {}
546 }
547 }
548}
549
550#[async_trait(?Send)]
551impl CallHandler for Call {
552 fn shared_screen_for_peer(
553 &self,
554 peer_id: PeerId,
555 _pane: &View<Pane>,
556 cx: &mut ViewContext<Workspace>,
557 ) -> Option<Box<dyn ItemHandle>> {
558 let (call, _) = self.active_call.as_ref()?;
559 let room = call.read(cx).room()?.read(cx);
560 let participant = room.remote_participant_for_peer_id(peer_id)?;
561 let _track = participant.video_tracks.values().next()?.clone();
562 let _user = participant.user.clone();
563 todo!();
564 // for item in pane.read(cx).items_of_type::<SharedScreen>() {
565 // if item.read(cx).peer_id == peer_id {
566 // return Box::new(Some(item));
567 // }
568 // }
569
570 // Some(Box::new(cx.build_view(|cx| {
571 // SharedScreen::new(&track, peer_id, user.clone(), cx)
572 // })))
573 }
574
575 fn room_id(&self, cx: &AppContext) -> Option<u64> {
576 Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
577 }
578 fn hang_up(&self, mut cx: AsyncWindowContext) -> Result<Task<Result<()>>> {
579 let Some((call, _)) = self.active_call.as_ref() else {
580 bail!("Cannot exit a call; not in a call");
581 };
582
583 call.update(&mut cx, |this, cx| this.hang_up(cx))
584 }
585 fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
586 ActiveCall::global(cx).read(cx).location().cloned()
587 }
588 fn peer_state(
589 &mut self,
590 leader_id: PeerId,
591 cx: &mut ViewContext<Workspace>,
592 ) -> Option<(bool, bool)> {
593 let (call, _) = self.active_call.as_ref()?;
594 let room = call.read(cx).room()?.read(cx);
595 let participant = room.remote_participant_for_peer_id(leader_id)?;
596
597 let leader_in_this_app;
598 let leader_in_this_project;
599 match participant.location {
600 ParticipantLocation::SharedProject { project_id } => {
601 leader_in_this_app = true;
602 leader_in_this_project = Some(project_id)
603 == self
604 .parent_workspace
605 .update(cx, |this, cx| this.project().read(cx).remote_id())
606 .log_err()
607 .flatten();
608 }
609 ParticipantLocation::UnsharedProject => {
610 leader_in_this_app = true;
611 leader_in_this_project = false;
612 }
613 ParticipantLocation::External => {
614 leader_in_this_app = false;
615 leader_in_this_project = false;
616 }
617 };
618
619 Some((leader_in_this_project, leader_in_this_app))
620 }
621}
622
623#[cfg(test)]
624mod test {
625 use gpui::TestAppContext;
626
627 use crate::OneAtATime;
628
629 #[gpui::test]
630 async fn test_one_at_a_time(cx: &mut TestAppContext) {
631 let mut one_at_a_time = OneAtATime { cancel: None };
632
633 assert_eq!(
634 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
635 .await
636 .unwrap(),
637 Some(1)
638 );
639
640 let (a, b) = cx.update(|cx| {
641 (
642 one_at_a_time.spawn(cx, |_| async {
643 assert!(false);
644 Ok(2)
645 }),
646 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
647 )
648 });
649
650 assert_eq!(a.await.unwrap(), None);
651 assert_eq!(b.await.unwrap(), Some(3));
652
653 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
654 drop(one_at_a_time);
655
656 assert_eq!(promise.await.unwrap(), None);
657 }
658}