1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use std::sync::Arc;
6
7use anyhow::{anyhow, Result};
8use audio::Audio;
9use call_settings::CallSettings;
10use channel::ChannelId;
11use client::{
12 proto::{self, PeerId},
13 ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
14};
15use collections::HashSet;
16use futures::{future::Shared, FutureExt};
17use postage::watch;
18
19use gpui::{
20 AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext,
21 ModelHandle, Subscription, Task, ViewContext, WeakModelHandle,
22};
23use project::Project;
24
25pub use participant::ParticipantLocation;
26pub use room::Room;
27use util::ResultExt;
28
29pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
30 settings::register::<CallSettings>(cx);
31
32 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
33 cx.set_global(active_call);
34}
35
36#[derive(Clone)]
37pub struct IncomingCall {
38 pub room_id: u64,
39 pub calling_user: Arc<User>,
40 pub participants: Vec<Arc<User>>,
41 pub initial_project: Option<proto::ParticipantProject>,
42}
43
44/// Singleton global maintaining the user's participation in a room across workspaces.
45pub struct ActiveCall {
46 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
47 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
48 location: Option<WeakModelHandle<Project>>,
49 pending_invites: HashSet<u64>,
50 incoming_call: (
51 watch::Sender<Option<IncomingCall>>,
52 watch::Receiver<Option<IncomingCall>>,
53 ),
54 client: Arc<Client>,
55 user_store: ModelHandle<UserStore>,
56 follow_handlers: Vec<FollowHandler>,
57 followers: Vec<Follower>,
58 _subscriptions: Vec<client::Subscription>,
59}
60
61#[derive(PartialEq, Eq, PartialOrd, Ord)]
62struct Follower {
63 project_id: Option<u64>,
64 peer_id: PeerId,
65}
66
67struct FollowHandler {
68 project_id: Option<u64>,
69 root_view: AnyWeakViewHandle,
70 get_views:
71 Box<dyn Fn(&AnyViewHandle, Option<u64>, &mut AppContext) -> Option<proto::FollowResponse>>,
72 update_view: Box<dyn Fn(&AnyViewHandle, PeerId, proto::UpdateFollowers, &mut AppContext)>,
73}
74
75impl Entity for ActiveCall {
76 type Event = room::Event;
77}
78
79impl ActiveCall {
80 fn new(
81 client: Arc<Client>,
82 user_store: ModelHandle<UserStore>,
83 cx: &mut ModelContext<Self>,
84 ) -> Self {
85 Self {
86 room: None,
87 pending_room_creation: None,
88 location: None,
89 pending_invites: Default::default(),
90 incoming_call: watch::channel(),
91 follow_handlers: Default::default(),
92 followers: Default::default(),
93 _subscriptions: vec![
94 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
95 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
96 client.add_request_handler(cx.handle(), Self::handle_follow),
97 client.add_message_handler(cx.handle(), Self::handle_unfollow),
98 client.add_message_handler(cx.handle(), Self::handle_update_followers),
99 ],
100 client,
101 user_store,
102 }
103 }
104
105 pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
106 self.room()?.read(cx).channel_id()
107 }
108
109 pub fn add_follow_handler<V: gpui::View, GetViews, UpdateView>(
110 &mut self,
111 root_view: gpui::ViewHandle<V>,
112 project_id: Option<u64>,
113 get_views: GetViews,
114 update_view: UpdateView,
115 _cx: &mut ModelContext<Self>,
116 ) where
117 GetViews: 'static
118 + Fn(&mut V, Option<u64>, &mut gpui::ViewContext<V>) -> Result<proto::FollowResponse>,
119 UpdateView:
120 'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext<V>) -> Result<()>,
121 {
122 self.follow_handlers
123 .retain(|h| h.root_view.id() != root_view.id());
124 if let Err(ix) = self
125 .follow_handlers
126 .binary_search_by_key(&(project_id, root_view.id()), |f| {
127 (f.project_id, f.root_view.id())
128 })
129 {
130 self.follow_handlers.insert(
131 ix,
132 FollowHandler {
133 project_id,
134 root_view: root_view.into_any().downgrade(),
135 get_views: Box::new(move |view, project_id, cx| {
136 let view = view.clone().downcast::<V>().unwrap();
137 view.update(cx, |view, cx| get_views(view, project_id, cx).log_err())
138 .flatten()
139 }),
140 update_view: Box::new(move |view, leader_id, message, cx| {
141 let view = view.clone().downcast::<V>().unwrap();
142 view.update(cx, |view, cx| {
143 update_view(view, leader_id, message, cx).log_err()
144 });
145 }),
146 },
147 );
148 }
149 }
150
151 async fn handle_incoming_call(
152 this: ModelHandle<Self>,
153 envelope: TypedEnvelope<proto::IncomingCall>,
154 _: Arc<Client>,
155 mut cx: AsyncAppContext,
156 ) -> Result<proto::Ack> {
157 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
158 let call = IncomingCall {
159 room_id: envelope.payload.room_id,
160 participants: user_store
161 .update(&mut cx, |user_store, cx| {
162 user_store.get_users(envelope.payload.participant_user_ids, cx)
163 })
164 .await?,
165 calling_user: user_store
166 .update(&mut cx, |user_store, cx| {
167 user_store.get_user(envelope.payload.calling_user_id, cx)
168 })
169 .await?,
170 initial_project: envelope.payload.initial_project,
171 };
172 this.update(&mut cx, |this, _| {
173 *this.incoming_call.0.borrow_mut() = Some(call);
174 });
175
176 Ok(proto::Ack {})
177 }
178
179 async fn handle_call_canceled(
180 this: ModelHandle<Self>,
181 envelope: TypedEnvelope<proto::CallCanceled>,
182 _: Arc<Client>,
183 mut cx: AsyncAppContext,
184 ) -> Result<()> {
185 this.update(&mut cx, |this, _| {
186 let mut incoming_call = this.incoming_call.0.borrow_mut();
187 if incoming_call
188 .as_ref()
189 .map_or(false, |call| call.room_id == envelope.payload.room_id)
190 {
191 incoming_call.take();
192 }
193 });
194 Ok(())
195 }
196
197 async fn handle_follow(
198 this: ModelHandle<Self>,
199 envelope: TypedEnvelope<proto::Follow>,
200 _: Arc<Client>,
201 mut cx: AsyncAppContext,
202 ) -> Result<proto::FollowResponse> {
203 this.update(&mut cx, |this, cx| {
204 let follower = Follower {
205 project_id: envelope.payload.project_id,
206 peer_id: envelope.original_sender_id()?,
207 };
208 let active_project_id = this
209 .location
210 .as_ref()
211 .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
212
213 let mut response = proto::FollowResponse::default();
214 for handler in &this.follow_handlers {
215 if follower.project_id != handler.project_id && follower.project_id.is_some() {
216 continue;
217 }
218
219 let Some(root_view) = handler.root_view.upgrade(cx) else {
220 continue;
221 };
222
223 let Some(handler_response) =
224 (handler.get_views)(&root_view, follower.project_id, cx)
225 else {
226 continue;
227 };
228
229 if response.views.is_empty() {
230 response.views = handler_response.views;
231 } else {
232 response.views.extend_from_slice(&handler_response.views);
233 }
234
235 if let Some(active_view_id) = handler_response.active_view_id.clone() {
236 if response.active_view_id.is_none() || handler.project_id == active_project_id
237 {
238 response.active_view_id = Some(active_view_id);
239 }
240 }
241 }
242
243 if let Err(ix) = this.followers.binary_search(&follower) {
244 this.followers.insert(ix, follower);
245 }
246
247 Ok(response)
248 })
249 }
250
251 async fn handle_unfollow(
252 this: ModelHandle<Self>,
253 envelope: TypedEnvelope<proto::Unfollow>,
254 _: Arc<Client>,
255 mut cx: AsyncAppContext,
256 ) -> Result<()> {
257 this.update(&mut cx, |this, _| {
258 let follower = Follower {
259 project_id: envelope.payload.project_id,
260 peer_id: envelope.original_sender_id()?,
261 };
262 if let Err(ix) = this.followers.binary_search(&follower) {
263 this.followers.remove(ix);
264 }
265 Ok(())
266 })
267 }
268
269 async fn handle_update_followers(
270 this: ModelHandle<Self>,
271 envelope: TypedEnvelope<proto::UpdateFollowers>,
272 _: Arc<Client>,
273 mut cx: AsyncAppContext,
274 ) -> Result<()> {
275 let leader_id = envelope.original_sender_id()?;
276 let update = envelope.payload;
277 this.update(&mut cx, |this, cx| {
278 for handler in &this.follow_handlers {
279 if update.project_id != handler.project_id && update.project_id.is_some() {
280 continue;
281 }
282 let Some(root_view) = handler.root_view.upgrade(cx) else {
283 continue;
284 };
285 (handler.update_view)(&root_view, leader_id, update.clone(), cx);
286 }
287 Ok(())
288 })
289 }
290
291 pub fn update_followers(
292 &self,
293 project_id: Option<u64>,
294 update: proto::update_followers::Variant,
295 cx: &AppContext,
296 ) -> Option<()> {
297 let room_id = self.room()?.read(cx).id();
298 let follower_ids: Vec<_> = self
299 .followers
300 .iter()
301 .filter_map(|follower| {
302 (follower.project_id == project_id).then_some(follower.peer_id.into())
303 })
304 .collect();
305 if follower_ids.is_empty() {
306 return None;
307 }
308 self.client
309 .send(proto::UpdateFollowers {
310 room_id,
311 project_id,
312 follower_ids,
313 variant: Some(update),
314 })
315 .log_err()
316 }
317
318 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
319 cx.global::<ModelHandle<Self>>().clone()
320 }
321
322 pub fn invite(
323 &mut self,
324 called_user_id: u64,
325 initial_project: Option<ModelHandle<Project>>,
326 cx: &mut ModelContext<Self>,
327 ) -> Task<Result<()>> {
328 if !self.pending_invites.insert(called_user_id) {
329 return Task::ready(Err(anyhow!("user was already invited")));
330 }
331 cx.notify();
332
333 let room = if let Some(room) = self.room().cloned() {
334 Some(Task::ready(Ok(room)).shared())
335 } else {
336 self.pending_room_creation.clone()
337 };
338
339 let invite = if let Some(room) = room {
340 cx.spawn_weak(|_, mut cx| async move {
341 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
342
343 let initial_project_id = if let Some(initial_project) = initial_project {
344 Some(
345 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
346 .await?,
347 )
348 } else {
349 None
350 };
351
352 room.update(&mut cx, |room, cx| {
353 room.call(called_user_id, initial_project_id, cx)
354 })
355 .await?;
356
357 anyhow::Ok(())
358 })
359 } else {
360 let client = self.client.clone();
361 let user_store = self.user_store.clone();
362 let room = cx
363 .spawn(|this, mut cx| async move {
364 let create_room = async {
365 let room = cx
366 .update(|cx| {
367 Room::create(
368 called_user_id,
369 initial_project,
370 client,
371 user_store,
372 cx,
373 )
374 })
375 .await?;
376
377 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
378 .await?;
379
380 anyhow::Ok(room)
381 };
382
383 let room = create_room.await;
384 this.update(&mut cx, |this, _| this.pending_room_creation = None);
385 room.map_err(Arc::new)
386 })
387 .shared();
388 self.pending_room_creation = Some(room.clone());
389 cx.foreground().spawn(async move {
390 room.await.map_err(|err| anyhow!("{:?}", err))?;
391 anyhow::Ok(())
392 })
393 };
394
395 cx.spawn(|this, mut cx| async move {
396 let result = invite.await;
397 this.update(&mut cx, |this, cx| {
398 this.pending_invites.remove(&called_user_id);
399 this.report_call_event("invite", cx);
400 cx.notify();
401 });
402 result
403 })
404 }
405
406 pub fn cancel_invite(
407 &mut self,
408 called_user_id: u64,
409 cx: &mut ModelContext<Self>,
410 ) -> Task<Result<()>> {
411 let room_id = if let Some(room) = self.room() {
412 room.read(cx).id()
413 } else {
414 return Task::ready(Err(anyhow!("no active call")));
415 };
416
417 let client = self.client.clone();
418 cx.foreground().spawn(async move {
419 client
420 .request(proto::CancelCall {
421 room_id,
422 called_user_id,
423 })
424 .await?;
425 anyhow::Ok(())
426 })
427 }
428
429 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
430 self.incoming_call.1.clone()
431 }
432
433 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
434 if self.room.is_some() {
435 return Task::ready(Err(anyhow!("cannot join while on another call")));
436 }
437
438 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
439 call
440 } else {
441 return Task::ready(Err(anyhow!("no incoming call")));
442 };
443
444 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
445
446 cx.spawn(|this, mut cx| async move {
447 let room = join.await?;
448 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
449 .await?;
450 this.update(&mut cx, |this, cx| {
451 this.report_call_event("accept incoming", cx)
452 });
453 Ok(())
454 })
455 }
456
457 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
458 let call = self
459 .incoming_call
460 .0
461 .borrow_mut()
462 .take()
463 .ok_or_else(|| anyhow!("no incoming call"))?;
464 Self::report_call_event_for_room(
465 "decline incoming",
466 Some(call.room_id),
467 None,
468 &self.client,
469 cx,
470 );
471 self.client.send(proto::DeclineCall {
472 room_id: call.room_id,
473 })?;
474 Ok(())
475 }
476
477 pub fn join_channel(
478 &mut self,
479 channel_id: u64,
480 cx: &mut ModelContext<Self>,
481 ) -> Task<Result<()>> {
482 if let Some(room) = self.room().cloned() {
483 if room.read(cx).channel_id() == Some(channel_id) {
484 return Task::ready(Ok(()));
485 } else {
486 room.update(cx, |room, cx| room.clear_state(cx));
487 }
488 }
489
490 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
491
492 cx.spawn(|this, mut cx| async move {
493 let room = join.await?;
494 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
495 .await?;
496 this.update(&mut cx, |this, cx| {
497 this.report_call_event("join channel", cx)
498 });
499 Ok(())
500 })
501 }
502
503 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
504 cx.notify();
505 self.report_call_event("hang up", cx);
506 Audio::end_call(cx);
507 if let Some((room, _)) = self.room.take() {
508 room.update(cx, |room, cx| room.leave(cx))
509 } else {
510 Task::ready(Ok(()))
511 }
512 }
513
514 pub fn share_project(
515 &mut self,
516 project: ModelHandle<Project>,
517 cx: &mut ModelContext<Self>,
518 ) -> Task<Result<u64>> {
519 if let Some((room, _)) = self.room.as_ref() {
520 self.report_call_event("share project", cx);
521 room.update(cx, |room, cx| room.share_project(project, cx))
522 } else {
523 Task::ready(Err(anyhow!("no active call")))
524 }
525 }
526
527 pub fn unshare_project(
528 &mut self,
529 project: ModelHandle<Project>,
530 cx: &mut ModelContext<Self>,
531 ) -> Result<()> {
532 if let Some((room, _)) = self.room.as_ref() {
533 self.report_call_event("unshare project", cx);
534 room.update(cx, |room, cx| room.unshare_project(project, cx))
535 } else {
536 Err(anyhow!("no active call"))
537 }
538 }
539
540 pub fn set_location(
541 &mut self,
542 project: Option<&ModelHandle<Project>>,
543 cx: &mut ModelContext<Self>,
544 ) -> Task<Result<()>> {
545 self.location = project.map(|project| project.downgrade());
546 if let Some((room, _)) = self.room.as_ref() {
547 room.update(cx, |room, cx| room.set_location(project, cx))
548 } else {
549 Task::ready(Ok(()))
550 }
551 }
552
553 fn set_room(
554 &mut self,
555 room: Option<ModelHandle<Room>>,
556 cx: &mut ModelContext<Self>,
557 ) -> Task<Result<()>> {
558 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
559 cx.notify();
560 if let Some(room) = room {
561 if room.read(cx).status().is_offline() {
562 self.room = None;
563 Task::ready(Ok(()))
564 } else {
565 let subscriptions = vec![
566 cx.observe(&room, |this, room, cx| {
567 if room.read(cx).status().is_offline() {
568 this.set_room(None, cx).detach_and_log_err(cx);
569 }
570
571 cx.notify();
572 }),
573 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
574 ];
575 self.room = Some((room.clone(), subscriptions));
576 let location = self.location.and_then(|location| location.upgrade(cx));
577 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
578 }
579 } else {
580 self.room = None;
581 Task::ready(Ok(()))
582 }
583 } else {
584 Task::ready(Ok(()))
585 }
586 }
587
588 pub fn room(&self) -> Option<&ModelHandle<Room>> {
589 self.room.as_ref().map(|(room, _)| room)
590 }
591
592 pub fn client(&self) -> Arc<Client> {
593 self.client.clone()
594 }
595
596 pub fn pending_invites(&self) -> &HashSet<u64> {
597 &self.pending_invites
598 }
599
600 fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
601 let (room_id, channel_id) = match self.room() {
602 Some(room) => {
603 let room = room.read(cx);
604 (Some(room.id()), room.channel_id())
605 }
606 None => (None, None),
607 };
608 Self::report_call_event_for_room(operation, room_id, channel_id, &self.client, cx)
609 }
610
611 pub fn report_call_event_for_room(
612 operation: &'static str,
613 room_id: Option<u64>,
614 channel_id: Option<u64>,
615 client: &Arc<Client>,
616 cx: &AppContext,
617 ) {
618 let telemetry = client.telemetry();
619 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
620 let event = ClickhouseEvent::Call {
621 operation,
622 room_id,
623 channel_id,
624 };
625 telemetry.report_clickhouse_event(event, telemetry_settings);
626 }
627}