1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use client::{
9 proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
10 ZED_ALWAYS_ACTIVE,
11};
12use collections::HashSet;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt};
14use gpui::{
15 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
16 WeakModelHandle,
17};
18use postage::watch;
19use project::Project;
20use std::sync::Arc;
21
22pub use participant::ParticipantLocation;
23pub use room::Room;
24
25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
26 settings::register::<CallSettings>(cx);
27
28 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
29 cx.set_global(active_call);
30}
31
32#[derive(Clone)]
33pub struct IncomingCall {
34 pub room_id: u64,
35 pub calling_user: Arc<User>,
36 pub participants: Vec<Arc<User>>,
37 pub initial_project: Option<proto::ParticipantProject>,
38}
39
40pub struct OneAtATime {
41 cancel: Option<oneshot::Sender<()>>,
42}
43
44impl OneAtATime {
45 /// spawn a task in the given context.
46 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
47 /// otherwise you'll see the result of the task.
48 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
49 where
50 F: 'static + FnOnce(AsyncAppContext) -> Fut,
51 Fut: Future<Output = Result<R>>,
52 R: 'static,
53 {
54 let (tx, rx) = oneshot::channel();
55 self.cancel.replace(tx);
56 cx.spawn(|cx| async move {
57 futures::select_biased! {
58 _ = rx.fuse() => Ok(None),
59 result = f(cx).fuse() => result.map(Some),
60 }
61 })
62 }
63
64 fn running(&self) -> bool {
65 self.cancel
66 .as_ref()
67 .is_some_and(|cancel| !cancel.is_canceled())
68 }
69}
70
71/// Singleton global maintaining the user's participation in a room across workspaces.
72pub struct ActiveCall {
73 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
74 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
75 _join_debouncer: OneAtATime,
76 location: Option<WeakModelHandle<Project>>,
77 pending_invites: HashSet<u64>,
78 incoming_call: (
79 watch::Sender<Option<IncomingCall>>,
80 watch::Receiver<Option<IncomingCall>>,
81 ),
82 client: Arc<Client>,
83 user_store: ModelHandle<UserStore>,
84 _subscriptions: Vec<client::Subscription>,
85}
86
87impl Entity for ActiveCall {
88 type Event = room::Event;
89}
90
91impl ActiveCall {
92 fn new(
93 client: Arc<Client>,
94 user_store: ModelHandle<UserStore>,
95 cx: &mut ModelContext<Self>,
96 ) -> Self {
97 Self {
98 room: None,
99 pending_room_creation: None,
100 location: None,
101 pending_invites: Default::default(),
102 incoming_call: watch::channel(),
103
104 _join_debouncer: OneAtATime { cancel: None },
105 _subscriptions: vec![
106 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
107 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
108 ],
109 client,
110 user_store,
111 }
112 }
113
114 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
115 self.room()?.read(cx).channel_id()
116 }
117
118 async fn handle_incoming_call(
119 this: ModelHandle<Self>,
120 envelope: TypedEnvelope<proto::IncomingCall>,
121 _: Arc<Client>,
122 mut cx: AsyncAppContext,
123 ) -> Result<proto::Ack> {
124 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
125 let call = IncomingCall {
126 room_id: envelope.payload.room_id,
127 participants: user_store
128 .update(&mut cx, |user_store, cx| {
129 user_store.get_users(envelope.payload.participant_user_ids, cx)
130 })
131 .await?,
132 calling_user: user_store
133 .update(&mut cx, |user_store, cx| {
134 user_store.get_user(envelope.payload.calling_user_id, cx)
135 })
136 .await?,
137 initial_project: envelope.payload.initial_project,
138 };
139 this.update(&mut cx, |this, _| {
140 *this.incoming_call.0.borrow_mut() = Some(call);
141 });
142
143 Ok(proto::Ack {})
144 }
145
146 async fn handle_call_canceled(
147 this: ModelHandle<Self>,
148 envelope: TypedEnvelope<proto::CallCanceled>,
149 _: Arc<Client>,
150 mut cx: AsyncAppContext,
151 ) -> Result<()> {
152 this.update(&mut cx, |this, _| {
153 let mut incoming_call = this.incoming_call.0.borrow_mut();
154 if incoming_call
155 .as_ref()
156 .map_or(false, |call| call.room_id == envelope.payload.room_id)
157 {
158 incoming_call.take();
159 }
160 });
161 Ok(())
162 }
163
164 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
165 cx.global::<ModelHandle<Self>>().clone()
166 }
167
168 pub fn invite(
169 &mut self,
170 called_user_id: u64,
171 initial_project: Option<ModelHandle<Project>>,
172 cx: &mut ModelContext<Self>,
173 ) -> Task<Result<()>> {
174 if !self.pending_invites.insert(called_user_id) {
175 return Task::ready(Err(anyhow!("user was already invited")));
176 }
177 cx.notify();
178
179 if self._join_debouncer.running() {
180 return Task::ready(Ok(()));
181 }
182
183 let room = if let Some(room) = self.room().cloned() {
184 Some(Task::ready(Ok(room)).shared())
185 } else {
186 self.pending_room_creation.clone()
187 };
188
189 let invite = if let Some(room) = room {
190 cx.spawn_weak(|_, mut cx| async move {
191 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
192
193 let initial_project_id = if let Some(initial_project) = initial_project {
194 Some(
195 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
196 .await?,
197 )
198 } else {
199 None
200 };
201
202 room.update(&mut cx, |room, cx| {
203 room.call(called_user_id, initial_project_id, cx)
204 })
205 .await?;
206
207 anyhow::Ok(())
208 })
209 } else {
210 let client = self.client.clone();
211 let user_store = self.user_store.clone();
212 let room = cx
213 .spawn(|this, mut cx| async move {
214 let create_room = async {
215 let room = cx
216 .update(|cx| {
217 Room::create(
218 called_user_id,
219 initial_project,
220 client,
221 user_store,
222 cx,
223 )
224 })
225 .await?;
226
227 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
228 .await?;
229
230 anyhow::Ok(room)
231 };
232
233 let room = create_room.await;
234 this.update(&mut cx, |this, _| this.pending_room_creation = None);
235 room.map_err(Arc::new)
236 })
237 .shared();
238 self.pending_room_creation = Some(room.clone());
239 cx.foreground().spawn(async move {
240 room.await.map_err(|err| anyhow!("{:?}", err))?;
241 anyhow::Ok(())
242 })
243 };
244
245 cx.spawn(|this, mut cx| async move {
246 let result = invite.await;
247 if result.is_ok() {
248 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
249 } else {
250 // TODO: Resport collaboration error
251 }
252
253 this.update(&mut cx, |this, cx| {
254 this.pending_invites.remove(&called_user_id);
255 cx.notify();
256 });
257 result
258 })
259 }
260
261 pub fn cancel_invite(
262 &mut self,
263 called_user_id: u64,
264 cx: &mut ModelContext<Self>,
265 ) -> Task<Result<()>> {
266 let room_id = if let Some(room) = self.room() {
267 room.read(cx).id()
268 } else {
269 return Task::ready(Err(anyhow!("no active call")));
270 };
271
272 let client = self.client.clone();
273 cx.foreground().spawn(async move {
274 client
275 .request(proto::CancelCall {
276 room_id,
277 called_user_id,
278 })
279 .await?;
280 anyhow::Ok(())
281 })
282 }
283
284 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
285 self.incoming_call.1.clone()
286 }
287
288 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
289 if self.room.is_some() {
290 return Task::ready(Err(anyhow!("cannot join while on another call")));
291 }
292
293 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
294 call
295 } else {
296 return Task::ready(Err(anyhow!("no incoming call")));
297 };
298
299 if self.pending_room_creation.is_some() {
300 return Task::ready(Ok(()));
301 }
302
303 let room_id = call.room_id.clone();
304 let client = self.client.clone();
305 let user_store = self.user_store.clone();
306 let join = self
307 ._join_debouncer
308 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
309
310 cx.spawn(|this, mut cx| async move {
311 let room = join.await?;
312 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
313 .await?;
314 this.update(&mut cx, |this, cx| {
315 this.report_call_event("accept incoming", cx)
316 });
317 Ok(())
318 })
319 }
320
321 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
322 let call = self
323 .incoming_call
324 .0
325 .borrow_mut()
326 .take()
327 .ok_or_else(|| anyhow!("no incoming call"))?;
328 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
329 self.client.send(proto::DeclineCall {
330 room_id: call.room_id,
331 })?;
332 Ok(())
333 }
334
335 pub fn join_channel(
336 &mut self,
337 channel_id: u64,
338 cx: &mut ModelContext<Self>,
339 ) -> Task<Result<Option<ModelHandle<Room>>>> {
340 if let Some(room) = self.room().cloned() {
341 if room.read(cx).channel_id() == Some(channel_id) {
342 return Task::ready(Ok(Some(room)));
343 } else {
344 room.update(cx, |room, cx| room.clear_state(cx));
345 }
346 }
347
348 if self.pending_room_creation.is_some() {
349 return Task::ready(Ok(None));
350 }
351
352 let client = self.client.clone();
353 let user_store = self.user_store.clone();
354 let join = self._join_debouncer.spawn(cx, move |cx| async move {
355 Room::join_channel(channel_id, client, user_store, cx).await
356 });
357
358 cx.spawn(move |this, mut cx| async move {
359 let room = join.await?;
360 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))
361 .await?;
362 this.update(&mut cx, |this, cx| {
363 this.report_call_event("join channel", cx)
364 });
365 Ok(room)
366 })
367 }
368
369 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
370 cx.notify();
371 self.report_call_event("hang up", cx);
372 Audio::end_call(cx);
373 if let Some((room, _)) = self.room.take() {
374 room.update(cx, |room, cx| room.leave(cx))
375 } else {
376 Task::ready(Ok(()))
377 }
378 }
379
380 pub fn share_project(
381 &mut self,
382 project: ModelHandle<Project>,
383 cx: &mut ModelContext<Self>,
384 ) -> Task<Result<u64>> {
385 if let Some((room, _)) = self.room.as_ref() {
386 self.report_call_event("share project", cx);
387 room.update(cx, |room, cx| room.share_project(project, cx))
388 } else {
389 Task::ready(Err(anyhow!("no active call")))
390 }
391 }
392
393 pub fn unshare_project(
394 &mut self,
395 project: ModelHandle<Project>,
396 cx: &mut ModelContext<Self>,
397 ) -> Result<()> {
398 if let Some((room, _)) = self.room.as_ref() {
399 self.report_call_event("unshare project", cx);
400 room.update(cx, |room, cx| room.unshare_project(project, cx))
401 } else {
402 Err(anyhow!("no active call"))
403 }
404 }
405
406 pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
407 self.location.as_ref()
408 }
409
410 pub fn set_location(
411 &mut self,
412 project: Option<&ModelHandle<Project>>,
413 cx: &mut ModelContext<Self>,
414 ) -> Task<Result<()>> {
415 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
416 self.location = project.map(|project| project.downgrade());
417 if let Some((room, _)) = self.room.as_ref() {
418 return room.update(cx, |room, cx| room.set_location(project, cx));
419 }
420 }
421 Task::ready(Ok(()))
422 }
423
424 fn set_room(
425 &mut self,
426 room: Option<ModelHandle<Room>>,
427 cx: &mut ModelContext<Self>,
428 ) -> Task<Result<()>> {
429 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
430 cx.notify();
431 if let Some(room) = room {
432 if room.read(cx).status().is_offline() {
433 self.room = None;
434 Task::ready(Ok(()))
435 } else {
436 let subscriptions = vec![
437 cx.observe(&room, |this, room, cx| {
438 if room.read(cx).status().is_offline() {
439 this.set_room(None, cx).detach_and_log_err(cx);
440 }
441
442 cx.notify();
443 }),
444 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
445 ];
446 self.room = Some((room.clone(), subscriptions));
447 let location = self.location.and_then(|location| location.upgrade(cx));
448 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
449 }
450 } else {
451 self.room = None;
452 Task::ready(Ok(()))
453 }
454 } else {
455 Task::ready(Ok(()))
456 }
457 }
458
459 pub fn room(&self) -> Option<&ModelHandle<Room>> {
460 self.room.as_ref().map(|(room, _)| room)
461 }
462
463 pub fn client(&self) -> Arc<Client> {
464 self.client.clone()
465 }
466
467 pub fn pending_invites(&self) -> &HashSet<u64> {
468 &self.pending_invites
469 }
470
471 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
472 if let Some(room) = self.room() {
473 let room = room.read(cx);
474 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
475 }
476 }
477}
478
479pub fn report_call_event_for_room(
480 operation: &'static str,
481 room_id: u64,
482 channel_id: Option<u64>,
483 client: &Arc<Client>,
484 cx: &AppContext,
485) {
486 let telemetry = client.telemetry();
487 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
488 let event = ClickhouseEvent::Call {
489 operation,
490 room_id: Some(room_id),
491 channel_id,
492 };
493 telemetry.report_clickhouse_event(event, telemetry_settings);
494}
495
496pub fn report_call_event_for_channel(
497 operation: &'static str,
498 channel_id: u64,
499 client: &Arc<Client>,
500 cx: &AppContext,
501) {
502 let room = ActiveCall::global(cx).read(cx).room();
503
504 let telemetry = client.telemetry();
505 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
506
507 let event = ClickhouseEvent::Call {
508 operation,
509 room_id: room.map(|r| r.read(cx).id()),
510 channel_id: Some(channel_id),
511 };
512 telemetry.report_clickhouse_event(event, telemetry_settings);
513}
514
515#[cfg(test)]
516mod test {
517 use gpui::TestAppContext;
518
519 use crate::OneAtATime;
520
521 #[gpui::test]
522 async fn test_one_at_a_time(cx: &mut TestAppContext) {
523 let mut one_at_a_time = OneAtATime { cancel: None };
524
525 assert_eq!(
526 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
527 .await
528 .unwrap(),
529 Some(1)
530 );
531
532 let (a, b) = cx.update(|cx| {
533 (
534 one_at_a_time.spawn(cx, |_| async {
535 assert!(false);
536 Ok(2)
537 }),
538 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
539 )
540 });
541
542 assert_eq!(a.await.unwrap(), None);
543 assert_eq!(b.await.unwrap(), Some(3));
544
545 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
546 drop(one_at_a_time);
547
548 assert_eq!(promise.await.unwrap(), None);
549 }
550}