1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use std::sync::Arc;
6
7use anyhow::{anyhow, Result};
8use audio::Audio;
9use call_settings::CallSettings;
10use channel::ChannelId;
11use client::{
12 proto::{self, PeerId},
13 ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
14};
15use collections::HashSet;
16use futures::{future::Shared, FutureExt};
17use postage::watch;
18
19use gpui::{
20 AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext,
21 ModelHandle, Subscription, Task, ViewContext, WeakModelHandle,
22};
23use project::Project;
24
25pub use participant::ParticipantLocation;
26pub use room::Room;
27use util::ResultExt;
28
29pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
30 settings::register::<CallSettings>(cx);
31
32 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
33 cx.set_global(active_call);
34}
35
36#[derive(Clone)]
37pub struct IncomingCall {
38 pub room_id: u64,
39 pub calling_user: Arc<User>,
40 pub participants: Vec<Arc<User>>,
41 pub initial_project: Option<proto::ParticipantProject>,
42}
43
44/// Singleton global maintaining the user's participation in a room across workspaces.
45pub struct ActiveCall {
46 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
47 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
48 location: Option<WeakModelHandle<Project>>,
49 pending_invites: HashSet<u64>,
50 incoming_call: (
51 watch::Sender<Option<IncomingCall>>,
52 watch::Receiver<Option<IncomingCall>>,
53 ),
54 client: Arc<Client>,
55 user_store: ModelHandle<UserStore>,
56 follow_handlers: Vec<FollowHandler>,
57 followers: Vec<Follower>,
58 _subscriptions: Vec<client::Subscription>,
59}
60
61#[derive(PartialEq, Eq, PartialOrd, Ord, Debug)]
62struct Follower {
63 project_id: Option<u64>,
64 peer_id: PeerId,
65}
66
67struct FollowHandler {
68 project_id: Option<u64>,
69 root_view: AnyWeakViewHandle,
70 get_views:
71 Box<dyn Fn(&AnyViewHandle, Option<u64>, &mut AppContext) -> Option<proto::FollowResponse>>,
72 update_view: Box<dyn Fn(&AnyViewHandle, PeerId, proto::UpdateFollowers, &mut AppContext)>,
73}
74
75impl Entity for ActiveCall {
76 type Event = room::Event;
77}
78
79impl ActiveCall {
80 fn new(
81 client: Arc<Client>,
82 user_store: ModelHandle<UserStore>,
83 cx: &mut ModelContext<Self>,
84 ) -> Self {
85 Self {
86 room: None,
87 pending_room_creation: None,
88 location: None,
89 pending_invites: Default::default(),
90 incoming_call: watch::channel(),
91 follow_handlers: Default::default(),
92 followers: Default::default(),
93 _subscriptions: vec![
94 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
95 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
96 client.add_request_handler(cx.handle(), Self::handle_follow),
97 client.add_message_handler(cx.handle(), Self::handle_unfollow),
98 client.add_message_handler(cx.handle(), Self::handle_update_from_leader),
99 ],
100 client,
101 user_store,
102 }
103 }
104
105 pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
106 self.room()?.read(cx).channel_id()
107 }
108
109 pub fn add_follow_handler<V: gpui::View, GetViews, UpdateView>(
110 &mut self,
111 root_view: gpui::ViewHandle<V>,
112 project_id: Option<u64>,
113 get_views: GetViews,
114 update_view: UpdateView,
115 _cx: &mut ModelContext<Self>,
116 ) where
117 GetViews: 'static
118 + Fn(&mut V, Option<u64>, &mut gpui::ViewContext<V>) -> Result<proto::FollowResponse>,
119 UpdateView:
120 'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext<V>) -> Result<()>,
121 {
122 self.follow_handlers
123 .retain(|h| h.root_view.id() != root_view.id());
124 if let Err(ix) = self
125 .follow_handlers
126 .binary_search_by_key(&(project_id, root_view.id()), |f| {
127 (f.project_id, f.root_view.id())
128 })
129 {
130 self.follow_handlers.insert(
131 ix,
132 FollowHandler {
133 project_id,
134 root_view: root_view.into_any().downgrade(),
135 get_views: Box::new(move |view, project_id, cx| {
136 let view = view.clone().downcast::<V>().unwrap();
137 view.update(cx, |view, cx| get_views(view, project_id, cx).log_err())
138 .flatten()
139 }),
140 update_view: Box::new(move |view, leader_id, message, cx| {
141 let view = view.clone().downcast::<V>().unwrap();
142 view.update(cx, |view, cx| {
143 update_view(view, leader_id, message, cx).log_err()
144 });
145 }),
146 },
147 );
148 }
149 }
150
151 async fn handle_incoming_call(
152 this: ModelHandle<Self>,
153 envelope: TypedEnvelope<proto::IncomingCall>,
154 _: Arc<Client>,
155 mut cx: AsyncAppContext,
156 ) -> Result<proto::Ack> {
157 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
158 let call = IncomingCall {
159 room_id: envelope.payload.room_id,
160 participants: user_store
161 .update(&mut cx, |user_store, cx| {
162 user_store.get_users(envelope.payload.participant_user_ids, cx)
163 })
164 .await?,
165 calling_user: user_store
166 .update(&mut cx, |user_store, cx| {
167 user_store.get_user(envelope.payload.calling_user_id, cx)
168 })
169 .await?,
170 initial_project: envelope.payload.initial_project,
171 };
172 this.update(&mut cx, |this, _| {
173 *this.incoming_call.0.borrow_mut() = Some(call);
174 });
175
176 Ok(proto::Ack {})
177 }
178
179 async fn handle_call_canceled(
180 this: ModelHandle<Self>,
181 envelope: TypedEnvelope<proto::CallCanceled>,
182 _: Arc<Client>,
183 mut cx: AsyncAppContext,
184 ) -> Result<()> {
185 this.update(&mut cx, |this, _| {
186 let mut incoming_call = this.incoming_call.0.borrow_mut();
187 if incoming_call
188 .as_ref()
189 .map_or(false, |call| call.room_id == envelope.payload.room_id)
190 {
191 incoming_call.take();
192 }
193 });
194 Ok(())
195 }
196
197 async fn handle_follow(
198 this: ModelHandle<Self>,
199 envelope: TypedEnvelope<proto::Follow>,
200 _: Arc<Client>,
201 mut cx: AsyncAppContext,
202 ) -> Result<proto::FollowResponse> {
203 this.update(&mut cx, |this, cx| {
204 let follower = Follower {
205 project_id: envelope.payload.project_id,
206 peer_id: envelope.original_sender_id()?,
207 };
208 let active_project_id = this
209 .location
210 .as_ref()
211 .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
212
213 let mut response = proto::FollowResponse::default();
214 for handler in &this.follow_handlers {
215 if follower.project_id != handler.project_id && follower.project_id.is_some() {
216 continue;
217 }
218
219 let Some(root_view) = handler.root_view.upgrade(cx) else {
220 continue;
221 };
222
223 let Some(handler_response) =
224 (handler.get_views)(&root_view, follower.project_id, cx)
225 else {
226 continue;
227 };
228
229 if response.views.is_empty() {
230 response.views = handler_response.views;
231 } else {
232 response.views.extend_from_slice(&handler_response.views);
233 }
234
235 if let Some(active_view_id) = handler_response.active_view_id.clone() {
236 if response.active_view_id.is_none() || handler.project_id == active_project_id
237 {
238 response.active_view_id = Some(active_view_id);
239 }
240 }
241 }
242
243 if let Err(ix) = this.followers.binary_search(&follower) {
244 this.followers.insert(ix, follower);
245 }
246
247 Ok(response)
248 })
249 }
250
251 async fn handle_unfollow(
252 this: ModelHandle<Self>,
253 envelope: TypedEnvelope<proto::Unfollow>,
254 _: Arc<Client>,
255 mut cx: AsyncAppContext,
256 ) -> Result<()> {
257 this.update(&mut cx, |this, _| {
258 let follower = Follower {
259 project_id: envelope.payload.project_id,
260 peer_id: envelope.original_sender_id()?,
261 };
262 if let Ok(ix) = this.followers.binary_search(&follower) {
263 this.followers.remove(ix);
264 }
265 Ok(())
266 })
267 }
268
269 async fn handle_update_from_leader(
270 this: ModelHandle<Self>,
271 envelope: TypedEnvelope<proto::UpdateFollowers>,
272 _: Arc<Client>,
273 mut cx: AsyncAppContext,
274 ) -> Result<()> {
275 let leader_id = envelope.original_sender_id()?;
276 let update = envelope.payload;
277 this.update(&mut cx, |this, cx| {
278 for handler in &this.follow_handlers {
279 if update.project_id != handler.project_id && update.project_id.is_some() {
280 continue;
281 }
282 let Some(root_view) = handler.root_view.upgrade(cx) else {
283 continue;
284 };
285 (handler.update_view)(&root_view, leader_id, update.clone(), cx);
286 }
287 Ok(())
288 })
289 }
290
291 pub fn update_followers(
292 &self,
293 project_id: Option<u64>,
294 update: proto::update_followers::Variant,
295 cx: &AppContext,
296 ) -> Option<()> {
297 let room_id = self.room()?.read(cx).id();
298 let follower_ids: Vec<_> = self
299 .followers
300 .iter()
301 .filter_map(|follower| {
302 (follower.project_id == project_id).then_some(follower.peer_id.into())
303 })
304 .collect();
305 if follower_ids.is_empty() {
306 return None;
307 }
308 self.client
309 .send(proto::UpdateFollowers {
310 room_id,
311 project_id,
312 follower_ids,
313 variant: Some(update),
314 })
315 .log_err()
316 }
317
318 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
319 cx.global::<ModelHandle<Self>>().clone()
320 }
321
322 pub fn invite(
323 &mut self,
324 called_user_id: u64,
325 initial_project: Option<ModelHandle<Project>>,
326 cx: &mut ModelContext<Self>,
327 ) -> Task<Result<()>> {
328 if !self.pending_invites.insert(called_user_id) {
329 return Task::ready(Err(anyhow!("user was already invited")));
330 }
331 cx.notify();
332
333 let room = if let Some(room) = self.room().cloned() {
334 Some(Task::ready(Ok(room)).shared())
335 } else {
336 self.pending_room_creation.clone()
337 };
338
339 let invite = if let Some(room) = room {
340 cx.spawn_weak(|_, mut cx| async move {
341 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
342
343 let initial_project_id = if let Some(initial_project) = initial_project {
344 Some(
345 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
346 .await?,
347 )
348 } else {
349 None
350 };
351
352 room.update(&mut cx, |room, cx| {
353 room.call(called_user_id, initial_project_id, cx)
354 })
355 .await?;
356
357 anyhow::Ok(())
358 })
359 } else {
360 let client = self.client.clone();
361 let user_store = self.user_store.clone();
362 let room = cx
363 .spawn(|this, mut cx| async move {
364 let create_room = async {
365 let room = cx
366 .update(|cx| {
367 Room::create(
368 called_user_id,
369 initial_project,
370 client,
371 user_store,
372 cx,
373 )
374 })
375 .await?;
376
377 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
378 .await?;
379
380 anyhow::Ok(room)
381 };
382
383 let room = create_room.await;
384 this.update(&mut cx, |this, _| this.pending_room_creation = None);
385 room.map_err(Arc::new)
386 })
387 .shared();
388 self.pending_room_creation = Some(room.clone());
389 cx.foreground().spawn(async move {
390 room.await.map_err(|err| anyhow!("{:?}", err))?;
391 anyhow::Ok(())
392 })
393 };
394
395 cx.spawn(|this, mut cx| async move {
396 let result = invite.await;
397 if result.is_ok() {
398 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
399 } else {
400 // TODO: Resport collaboration error
401 }
402
403 this.update(&mut cx, |this, cx| {
404 this.pending_invites.remove(&called_user_id);
405 cx.notify();
406 });
407 result
408 })
409 }
410
411 pub fn cancel_invite(
412 &mut self,
413 called_user_id: u64,
414 cx: &mut ModelContext<Self>,
415 ) -> Task<Result<()>> {
416 let room_id = if let Some(room) = self.room() {
417 room.read(cx).id()
418 } else {
419 return Task::ready(Err(anyhow!("no active call")));
420 };
421
422 let client = self.client.clone();
423 cx.foreground().spawn(async move {
424 client
425 .request(proto::CancelCall {
426 room_id,
427 called_user_id,
428 })
429 .await?;
430 anyhow::Ok(())
431 })
432 }
433
434 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
435 self.incoming_call.1.clone()
436 }
437
438 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
439 if self.room.is_some() {
440 return Task::ready(Err(anyhow!("cannot join while on another call")));
441 }
442
443 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
444 call
445 } else {
446 return Task::ready(Err(anyhow!("no incoming call")));
447 };
448
449 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
450
451 cx.spawn(|this, mut cx| async move {
452 let room = join.await?;
453 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
454 .await?;
455 this.update(&mut cx, |this, cx| {
456 this.report_call_event("accept incoming", cx)
457 });
458 Ok(())
459 })
460 }
461
462 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
463 let call = self
464 .incoming_call
465 .0
466 .borrow_mut()
467 .take()
468 .ok_or_else(|| anyhow!("no incoming call"))?;
469 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
470 self.client.send(proto::DeclineCall {
471 room_id: call.room_id,
472 })?;
473 Ok(())
474 }
475
476 pub fn join_channel(
477 &mut self,
478 channel_id: u64,
479 cx: &mut ModelContext<Self>,
480 ) -> Task<Result<()>> {
481 if let Some(room) = self.room().cloned() {
482 if room.read(cx).channel_id() == Some(channel_id) {
483 return Task::ready(Ok(()));
484 } else {
485 room.update(cx, |room, cx| room.clear_state(cx));
486 }
487 }
488
489 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
490
491 cx.spawn(|this, mut cx| async move {
492 let room = join.await?;
493 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
494 .await?;
495 this.update(&mut cx, |this, cx| {
496 this.report_call_event("join channel", cx)
497 });
498 Ok(())
499 })
500 }
501
502 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
503 cx.notify();
504 self.report_call_event("hang up", cx);
505 Audio::end_call(cx);
506 if let Some((room, _)) = self.room.take() {
507 room.update(cx, |room, cx| room.leave(cx))
508 } else {
509 Task::ready(Ok(()))
510 }
511 }
512
513 pub fn share_project(
514 &mut self,
515 project: ModelHandle<Project>,
516 cx: &mut ModelContext<Self>,
517 ) -> Task<Result<u64>> {
518 if let Some((room, _)) = self.room.as_ref() {
519 self.report_call_event("share project", cx);
520 room.update(cx, |room, cx| room.share_project(project, cx))
521 } else {
522 Task::ready(Err(anyhow!("no active call")))
523 }
524 }
525
526 pub fn unshare_project(
527 &mut self,
528 project: ModelHandle<Project>,
529 cx: &mut ModelContext<Self>,
530 ) -> Result<()> {
531 if let Some((room, _)) = self.room.as_ref() {
532 self.report_call_event("unshare project", cx);
533 room.update(cx, |room, cx| room.unshare_project(project, cx))
534 } else {
535 Err(anyhow!("no active call"))
536 }
537 }
538
539 pub fn set_location(
540 &mut self,
541 project: Option<&ModelHandle<Project>>,
542 cx: &mut ModelContext<Self>,
543 ) -> Task<Result<()>> {
544 self.location = project.map(|project| project.downgrade());
545 if let Some((room, _)) = self.room.as_ref() {
546 room.update(cx, |room, cx| room.set_location(project, cx))
547 } else {
548 Task::ready(Ok(()))
549 }
550 }
551
552 fn set_room(
553 &mut self,
554 room: Option<ModelHandle<Room>>,
555 cx: &mut ModelContext<Self>,
556 ) -> Task<Result<()>> {
557 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
558 cx.notify();
559 if let Some(room) = room {
560 if room.read(cx).status().is_offline() {
561 self.room = None;
562 Task::ready(Ok(()))
563 } else {
564 let subscriptions = vec![
565 cx.observe(&room, |this, room, cx| {
566 if room.read(cx).status().is_offline() {
567 this.set_room(None, cx).detach_and_log_err(cx);
568 }
569
570 cx.notify();
571 }),
572 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
573 ];
574 self.room = Some((room.clone(), subscriptions));
575 let location = self.location.and_then(|location| location.upgrade(cx));
576 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
577 }
578 } else {
579 self.room = None;
580 Task::ready(Ok(()))
581 }
582 } else {
583 Task::ready(Ok(()))
584 }
585 }
586
587 pub fn room(&self) -> Option<&ModelHandle<Room>> {
588 self.room.as_ref().map(|(room, _)| room)
589 }
590
591 pub fn client(&self) -> Arc<Client> {
592 self.client.clone()
593 }
594
595 pub fn pending_invites(&self) -> &HashSet<u64> {
596 &self.pending_invites
597 }
598
599 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
600 if let Some(room) = self.room() {
601 let room = room.read(cx);
602 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
603 }
604 }
605}
606
607pub fn report_call_event_for_room(
608 operation: &'static str,
609 room_id: u64,
610 channel_id: Option<u64>,
611 client: &Arc<Client>,
612 cx: &AppContext,
613) {
614 let telemetry = client.telemetry();
615 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
616 let event = ClickhouseEvent::Call {
617 operation,
618 room_id: Some(room_id),
619 channel_id,
620 };
621 telemetry.report_clickhouse_event(event, telemetry_settings);
622}
623
624pub fn report_call_event_for_channel(
625 operation: &'static str,
626 channel_id: u64,
627 client: &Arc<Client>,
628 cx: &AppContext,
629) {
630 let room = ActiveCall::global(cx).read(cx).room();
631
632 let telemetry = client.telemetry();
633 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
634
635 let event = ClickhouseEvent::Call {
636 operation,
637 room_id: room.map(|r| r.read(cx).id()),
638 channel_id: Some(channel_id),
639 };
640 telemetry.report_clickhouse_event(event, telemetry_settings);
641}