1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use client::{
9 proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
10 ZED_ALWAYS_ACTIVE,
11};
12use collections::HashSet;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Subscription, Task,
16 WeakModel,
17};
18use postage::watch;
19use project::Project;
20use settings::Settings;
21use std::sync::Arc;
22
23pub use participant::ParticipantLocation;
24pub use room::Room;
25
26pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
27 CallSettings::register(cx);
28
29 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
30 cx.set_global(active_call);
31}
32
33pub struct OneAtATime {
34 cancel: Option<oneshot::Sender<()>>,
35}
36
37impl OneAtATime {
38 /// spawn a task in the given context.
39 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
40 /// otherwise you'll see the result of the task.
41 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
42 where
43 F: 'static + FnOnce(AsyncAppContext) -> Fut,
44 Fut: Future<Output = Result<R>>,
45 R: 'static,
46 {
47 let (tx, rx) = oneshot::channel();
48 self.cancel.replace(tx);
49 cx.spawn(|cx| async move {
50 futures::select_biased! {
51 _ = rx.fuse() => Ok(None),
52 result = f(cx).fuse() => result.map(Some),
53 }
54 })
55 }
56
57 fn running(&self) -> bool {
58 self.cancel
59 .as_ref()
60 .is_some_and(|cancel| !cancel.is_canceled())
61 }
62}
63
64#[derive(Clone)]
65pub struct IncomingCall {
66 pub room_id: u64,
67 pub calling_user: Arc<User>,
68 pub participants: Vec<Arc<User>>,
69 pub initial_project: Option<proto::ParticipantProject>,
70}
71
72/// Singleton global maintaining the user's participation in a room across workspaces.
73pub struct ActiveCall {
74 room: Option<(Model<Room>, Vec<Subscription>)>,
75 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
76 location: Option<WeakModel<Project>>,
77 _join_debouncer: OneAtATime,
78 pending_invites: HashSet<u64>,
79 incoming_call: (
80 watch::Sender<Option<IncomingCall>>,
81 watch::Receiver<Option<IncomingCall>>,
82 ),
83 client: Arc<Client>,
84 user_store: Model<UserStore>,
85 _subscriptions: Vec<client::Subscription>,
86}
87
88impl EventEmitter for ActiveCall {
89 type Event = room::Event;
90}
91
92impl ActiveCall {
93 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
94 Self {
95 room: None,
96 pending_room_creation: None,
97 location: None,
98 pending_invites: Default::default(),
99 incoming_call: watch::channel(),
100 _join_debouncer: OneAtATime { cancel: None },
101 _subscriptions: vec![
102 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
103 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
104 ],
105 client,
106 user_store,
107 }
108 }
109
110 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
111 self.room()?.read(cx).channel_id()
112 }
113
114 async fn handle_incoming_call(
115 this: Model<Self>,
116 envelope: TypedEnvelope<proto::IncomingCall>,
117 _: Arc<Client>,
118 mut cx: AsyncAppContext,
119 ) -> Result<proto::Ack> {
120 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
121 let call = IncomingCall {
122 room_id: envelope.payload.room_id,
123 participants: user_store
124 .update(&mut cx, |user_store, cx| {
125 user_store.get_users(envelope.payload.participant_user_ids, cx)
126 })?
127 .await?,
128 calling_user: user_store
129 .update(&mut cx, |user_store, cx| {
130 user_store.get_user(envelope.payload.calling_user_id, cx)
131 })?
132 .await?,
133 initial_project: envelope.payload.initial_project,
134 };
135 this.update(&mut cx, |this, _| {
136 *this.incoming_call.0.borrow_mut() = Some(call);
137 })?;
138
139 Ok(proto::Ack {})
140 }
141
142 async fn handle_call_canceled(
143 this: Model<Self>,
144 envelope: TypedEnvelope<proto::CallCanceled>,
145 _: Arc<Client>,
146 mut cx: AsyncAppContext,
147 ) -> Result<()> {
148 this.update(&mut cx, |this, _| {
149 let mut incoming_call = this.incoming_call.0.borrow_mut();
150 if incoming_call
151 .as_ref()
152 .map_or(false, |call| call.room_id == envelope.payload.room_id)
153 {
154 incoming_call.take();
155 }
156 })?;
157 Ok(())
158 }
159
160 pub fn global(cx: &AppContext) -> Model<Self> {
161 cx.global::<Model<Self>>().clone()
162 }
163
164 pub fn invite(
165 &mut self,
166 called_user_id: u64,
167 initial_project: Option<Model<Project>>,
168 cx: &mut ModelContext<Self>,
169 ) -> Task<Result<()>> {
170 if !self.pending_invites.insert(called_user_id) {
171 return Task::ready(Err(anyhow!("user was already invited")));
172 }
173 cx.notify();
174
175 if self._join_debouncer.running() {
176 return Task::ready(Ok(()));
177 }
178
179 let room = if let Some(room) = self.room().cloned() {
180 Some(Task::ready(Ok(room)).shared())
181 } else {
182 self.pending_room_creation.clone()
183 };
184
185 let invite = if let Some(room) = room {
186 cx.spawn(move |_, mut cx| async move {
187 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
188
189 let initial_project_id = if let Some(initial_project) = initial_project {
190 Some(
191 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
192 .await?,
193 )
194 } else {
195 None
196 };
197
198 room.update(&mut cx, move |room, cx| {
199 room.call(called_user_id, initial_project_id, cx)
200 })?
201 .await?;
202
203 anyhow::Ok(())
204 })
205 } else {
206 let client = self.client.clone();
207 let user_store = self.user_store.clone();
208 let room = cx
209 .spawn(move |this, mut cx| async move {
210 let create_room = async {
211 let room = cx
212 .update(|cx| {
213 Room::create(
214 called_user_id,
215 initial_project,
216 client,
217 user_store,
218 cx,
219 )
220 })?
221 .await?;
222
223 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
224 .await?;
225
226 anyhow::Ok(room)
227 };
228
229 let room = create_room.await;
230 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
231 room.map_err(Arc::new)
232 })
233 .shared();
234 self.pending_room_creation = Some(room.clone());
235 cx.background_executor().spawn(async move {
236 room.await.map_err(|err| anyhow!("{:?}", err))?;
237 anyhow::Ok(())
238 })
239 };
240
241 cx.spawn(move |this, mut cx| async move {
242 let result = invite.await;
243 if result.is_ok() {
244 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
245 } else {
246 // TODO: Resport collaboration error
247 }
248
249 this.update(&mut cx, |this, cx| {
250 this.pending_invites.remove(&called_user_id);
251 cx.notify();
252 })?;
253 result
254 })
255 }
256
257 pub fn cancel_invite(
258 &mut self,
259 called_user_id: u64,
260 cx: &mut ModelContext<Self>,
261 ) -> Task<Result<()>> {
262 let room_id = if let Some(room) = self.room() {
263 room.read(cx).id()
264 } else {
265 return Task::ready(Err(anyhow!("no active call")));
266 };
267
268 let client = self.client.clone();
269 cx.background_executor().spawn(async move {
270 client
271 .request(proto::CancelCall {
272 room_id,
273 called_user_id,
274 })
275 .await?;
276 anyhow::Ok(())
277 })
278 }
279
280 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
281 self.incoming_call.1.clone()
282 }
283
284 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
285 if self.room.is_some() {
286 return Task::ready(Err(anyhow!("cannot join while on another call")));
287 }
288
289 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
290 call
291 } else {
292 return Task::ready(Err(anyhow!("no incoming call")));
293 };
294
295 if self.pending_room_creation.is_some() {
296 return Task::ready(Ok(()));
297 }
298
299 let room_id = call.room_id.clone();
300 let client = self.client.clone();
301 let user_store = self.user_store.clone();
302 let join = self
303 ._join_debouncer
304 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
305
306 cx.spawn(|this, mut cx| async move {
307 let room = join.await?;
308 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
309 .await?;
310 this.update(&mut cx, |this, cx| {
311 this.report_call_event("accept incoming", cx)
312 })?;
313 Ok(())
314 })
315 }
316
317 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
318 let call = self
319 .incoming_call
320 .0
321 .borrow_mut()
322 .take()
323 .ok_or_else(|| anyhow!("no incoming call"))?;
324 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
325 self.client.send(proto::DeclineCall {
326 room_id: call.room_id,
327 })?;
328 Ok(())
329 }
330
331 pub fn join_channel(
332 &mut self,
333 channel_id: u64,
334 cx: &mut ModelContext<Self>,
335 ) -> Task<Result<Option<Model<Room>>>> {
336 if let Some(room) = self.room().cloned() {
337 if room.read(cx).channel_id() == Some(channel_id) {
338 return Task::ready(Ok(Some(room)));
339 } else {
340 room.update(cx, |room, cx| room.clear_state(cx));
341 }
342 }
343
344 if self.pending_room_creation.is_some() {
345 return Task::ready(Ok(None));
346 }
347
348 let client = self.client.clone();
349 let user_store = self.user_store.clone();
350 let join = self._join_debouncer.spawn(cx, move |cx| async move {
351 Room::join_channel(channel_id, client, user_store, cx).await
352 });
353
354 cx.spawn(|this, mut cx| async move {
355 let room = join.await?;
356 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
357 .await?;
358 this.update(&mut cx, |this, cx| {
359 this.report_call_event("join channel", cx)
360 })?;
361 Ok(room)
362 })
363 }
364
365 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
366 cx.notify();
367 self.report_call_event("hang up", cx);
368
369 Audio::end_call(cx);
370 if let Some((room, _)) = self.room.take() {
371 room.update(cx, |room, cx| room.leave(cx))
372 } else {
373 Task::ready(Ok(()))
374 }
375 }
376
377 pub fn share_project(
378 &mut self,
379 project: Model<Project>,
380 cx: &mut ModelContext<Self>,
381 ) -> Task<Result<u64>> {
382 if let Some((room, _)) = self.room.as_ref() {
383 self.report_call_event("share project", cx);
384 room.update(cx, |room, cx| room.share_project(project, cx))
385 } else {
386 Task::ready(Err(anyhow!("no active call")))
387 }
388 }
389
390 pub fn unshare_project(
391 &mut self,
392 project: Model<Project>,
393 cx: &mut ModelContext<Self>,
394 ) -> Result<()> {
395 if let Some((room, _)) = self.room.as_ref() {
396 self.report_call_event("unshare project", cx);
397 room.update(cx, |room, cx| room.unshare_project(project, cx))
398 } else {
399 Err(anyhow!("no active call"))
400 }
401 }
402
403 pub fn location(&self) -> Option<&WeakModel<Project>> {
404 self.location.as_ref()
405 }
406
407 pub fn set_location(
408 &mut self,
409 project: Option<&Model<Project>>,
410 cx: &mut ModelContext<Self>,
411 ) -> Task<Result<()>> {
412 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
413 self.location = project.map(|project| project.downgrade());
414 if let Some((room, _)) = self.room.as_ref() {
415 return room.update(cx, |room, cx| room.set_location(project, cx));
416 }
417 }
418 Task::ready(Ok(()))
419 }
420
421 fn set_room(
422 &mut self,
423 room: Option<Model<Room>>,
424 cx: &mut ModelContext<Self>,
425 ) -> Task<Result<()>> {
426 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
427 cx.notify();
428 if let Some(room) = room {
429 if room.read(cx).status().is_offline() {
430 self.room = None;
431 Task::ready(Ok(()))
432 } else {
433 let subscriptions = vec![
434 cx.observe(&room, |this, room, cx| {
435 if room.read(cx).status().is_offline() {
436 this.set_room(None, cx).detach_and_log_err(cx);
437 }
438
439 cx.notify();
440 }),
441 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
442 ];
443 self.room = Some((room.clone(), subscriptions));
444 let location = self
445 .location
446 .as_ref()
447 .and_then(|location| location.upgrade());
448 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
449 }
450 } else {
451 self.room = None;
452 Task::ready(Ok(()))
453 }
454 } else {
455 Task::ready(Ok(()))
456 }
457 }
458
459 pub fn room(&self) -> Option<&Model<Room>> {
460 self.room.as_ref().map(|(room, _)| room)
461 }
462
463 pub fn client(&self) -> Arc<Client> {
464 self.client.clone()
465 }
466
467 pub fn pending_invites(&self) -> &HashSet<u64> {
468 &self.pending_invites
469 }
470
471 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
472 if let Some(room) = self.room() {
473 let room = room.read(cx);
474 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
475 }
476 }
477}
478
479pub fn report_call_event_for_room(
480 operation: &'static str,
481 room_id: u64,
482 channel_id: Option<u64>,
483 client: &Arc<Client>,
484 cx: &AppContext,
485) {
486 let telemetry = client.telemetry();
487 let telemetry_settings = *TelemetrySettings::get_global(cx);
488 let event = ClickhouseEvent::Call {
489 operation,
490 room_id: Some(room_id),
491 channel_id,
492 };
493 telemetry.report_clickhouse_event(event, telemetry_settings);
494}
495
496pub fn report_call_event_for_channel(
497 operation: &'static str,
498 channel_id: u64,
499 client: &Arc<Client>,
500 cx: &AppContext,
501) {
502 let room = ActiveCall::global(cx).read(cx).room();
503
504 let telemetry = client.telemetry();
505
506 let telemetry_settings = *TelemetrySettings::get_global(cx);
507
508 let event = ClickhouseEvent::Call {
509 operation,
510 room_id: room.map(|r| r.read(cx).id()),
511 channel_id: Some(channel_id),
512 };
513 telemetry.report_clickhouse_event(event, telemetry_settings);
514}
515
516#[cfg(test)]
517mod test {
518 use gpui::TestAppContext;
519
520 use crate::OneAtATime;
521
522 #[gpui::test]
523 async fn test_one_at_a_time(cx: &mut TestAppContext) {
524 let mut one_at_a_time = OneAtATime { cancel: None };
525
526 assert_eq!(
527 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
528 .await
529 .unwrap(),
530 Some(1)
531 );
532
533 let (a, b) = cx.update(|cx| {
534 (
535 one_at_a_time.spawn(cx, |_| async {
536 assert!(false);
537 Ok(2)
538 }),
539 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
540 )
541 });
542
543 assert_eq!(a.await.unwrap(), None);
544 assert_eq!(b.await.unwrap(), Some(3));
545
546 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
547 drop(one_at_a_time);
548
549 assert_eq!(promise.await.unwrap(), None);
550 }
551}