1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
9use collections::HashSet;
10use futures::{channel::oneshot, future::Shared, Future, FutureExt};
11use gpui::{
12 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, Subscription,
13 Task, WeakModel,
14};
15use postage::watch;
16use project::Project;
17use room::Event;
18use settings::Settings;
19use std::sync::Arc;
20
21#[cfg(not(target_os = "windows"))]
22pub use live_kit_client::play_remote_video_track;
23pub use live_kit_client::{
24 track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
25};
26pub use participant::ParticipantLocation;
27pub use room::Room;
28
29struct GlobalActiveCall(Model<ActiveCall>);
30
31impl Global for GlobalActiveCall {}
32
33pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
34 live_kit_client::init(
35 cx.background_executor().dispatcher.clone(),
36 cx.http_client(),
37 );
38 CallSettings::register(cx);
39
40 let active_call = cx.new_model(|cx| ActiveCall::new(client, user_store, cx));
41 cx.set_global(GlobalActiveCall(active_call));
42}
43
44pub struct OneAtATime {
45 cancel: Option<oneshot::Sender<()>>,
46}
47
48impl OneAtATime {
49 /// spawn a task in the given context.
50 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
51 /// otherwise you'll see the result of the task.
52 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
53 where
54 F: 'static + FnOnce(AsyncAppContext) -> Fut,
55 Fut: Future<Output = Result<R>>,
56 R: 'static,
57 {
58 let (tx, rx) = oneshot::channel();
59 self.cancel.replace(tx);
60 cx.spawn(|cx| async move {
61 futures::select_biased! {
62 _ = rx.fuse() => Ok(None),
63 result = f(cx).fuse() => result.map(Some),
64 }
65 })
66 }
67
68 fn running(&self) -> bool {
69 self.cancel
70 .as_ref()
71 .is_some_and(|cancel| !cancel.is_canceled())
72 }
73}
74
75#[derive(Clone)]
76pub struct IncomingCall {
77 pub room_id: u64,
78 pub calling_user: Arc<User>,
79 pub participants: Vec<Arc<User>>,
80 pub initial_project: Option<proto::ParticipantProject>,
81}
82
83/// Singleton global maintaining the user's participation in a room across workspaces.
84pub struct ActiveCall {
85 room: Option<(Model<Room>, Vec<Subscription>)>,
86 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
87 location: Option<WeakModel<Project>>,
88 _join_debouncer: OneAtATime,
89 pending_invites: HashSet<u64>,
90 incoming_call: (
91 watch::Sender<Option<IncomingCall>>,
92 watch::Receiver<Option<IncomingCall>>,
93 ),
94 client: Arc<Client>,
95 user_store: Model<UserStore>,
96 _subscriptions: Vec<client::Subscription>,
97}
98
99impl EventEmitter<Event> for ActiveCall {}
100
101impl ActiveCall {
102 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
103 Self {
104 room: None,
105 pending_room_creation: None,
106 location: None,
107 pending_invites: Default::default(),
108 incoming_call: watch::channel(),
109 _join_debouncer: OneAtATime { cancel: None },
110 _subscriptions: vec![
111 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
112 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
113 ],
114 client,
115 user_store,
116 }
117 }
118
119 pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
120 self.room()?.read(cx).channel_id()
121 }
122
123 async fn handle_incoming_call(
124 this: Model<Self>,
125 envelope: TypedEnvelope<proto::IncomingCall>,
126 mut cx: AsyncAppContext,
127 ) -> Result<proto::Ack> {
128 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
129 let call = IncomingCall {
130 room_id: envelope.payload.room_id,
131 participants: user_store
132 .update(&mut cx, |user_store, cx| {
133 user_store.get_users(envelope.payload.participant_user_ids, cx)
134 })?
135 .await?,
136 calling_user: user_store
137 .update(&mut cx, |user_store, cx| {
138 user_store.get_user(envelope.payload.calling_user_id, cx)
139 })?
140 .await?,
141 initial_project: envelope.payload.initial_project,
142 };
143 this.update(&mut cx, |this, _| {
144 *this.incoming_call.0.borrow_mut() = Some(call);
145 })?;
146
147 Ok(proto::Ack {})
148 }
149
150 async fn handle_call_canceled(
151 this: Model<Self>,
152 envelope: TypedEnvelope<proto::CallCanceled>,
153 mut cx: AsyncAppContext,
154 ) -> Result<()> {
155 this.update(&mut cx, |this, _| {
156 let mut incoming_call = this.incoming_call.0.borrow_mut();
157 if incoming_call
158 .as_ref()
159 .map_or(false, |call| call.room_id == envelope.payload.room_id)
160 {
161 incoming_call.take();
162 }
163 })?;
164 Ok(())
165 }
166
167 pub fn global(cx: &AppContext) -> Model<Self> {
168 cx.global::<GlobalActiveCall>().0.clone()
169 }
170
171 pub fn try_global(cx: &AppContext) -> Option<Model<Self>> {
172 cx.try_global::<GlobalActiveCall>()
173 .map(|call| call.0.clone())
174 }
175
176 pub fn invite(
177 &mut self,
178 called_user_id: u64,
179 initial_project: Option<Model<Project>>,
180 cx: &mut ModelContext<Self>,
181 ) -> Task<Result<()>> {
182 if !self.pending_invites.insert(called_user_id) {
183 return Task::ready(Err(anyhow!("user was already invited")));
184 }
185 cx.notify();
186
187 if self._join_debouncer.running() {
188 return Task::ready(Ok(()));
189 }
190
191 let room = if let Some(room) = self.room().cloned() {
192 Some(Task::ready(Ok(room)).shared())
193 } else {
194 self.pending_room_creation.clone()
195 };
196
197 let invite = if let Some(room) = room {
198 cx.spawn(move |_, mut cx| async move {
199 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
200
201 let initial_project_id = if let Some(initial_project) = initial_project {
202 Some(
203 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
204 .await?,
205 )
206 } else {
207 None
208 };
209
210 room.update(&mut cx, move |room, cx| {
211 room.call(called_user_id, initial_project_id, cx)
212 })?
213 .await?;
214
215 anyhow::Ok(())
216 })
217 } else {
218 let client = self.client.clone();
219 let user_store = self.user_store.clone();
220 let room = cx
221 .spawn(move |this, mut cx| async move {
222 let create_room = async {
223 let room = cx
224 .update(|cx| {
225 Room::create(
226 called_user_id,
227 initial_project,
228 client,
229 user_store,
230 cx,
231 )
232 })?
233 .await?;
234
235 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
236 .await?;
237
238 anyhow::Ok(room)
239 };
240
241 let room = create_room.await;
242 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
243 room.map_err(Arc::new)
244 })
245 .shared();
246 self.pending_room_creation = Some(room.clone());
247 cx.background_executor().spawn(async move {
248 room.await.map_err(|err| anyhow!("{:?}", err))?;
249 anyhow::Ok(())
250 })
251 };
252
253 cx.spawn(move |this, mut cx| async move {
254 let result = invite.await;
255 if result.is_ok() {
256 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
257 } else {
258 //TODO: report collaboration error
259 log::error!("invite failed: {:?}", result);
260 }
261
262 this.update(&mut cx, |this, cx| {
263 this.pending_invites.remove(&called_user_id);
264 cx.notify();
265 })?;
266 result
267 })
268 }
269
270 pub fn cancel_invite(
271 &mut self,
272 called_user_id: u64,
273 cx: &mut ModelContext<Self>,
274 ) -> Task<Result<()>> {
275 let room_id = if let Some(room) = self.room() {
276 room.read(cx).id()
277 } else {
278 return Task::ready(Err(anyhow!("no active call")));
279 };
280
281 let client = self.client.clone();
282 cx.background_executor().spawn(async move {
283 client
284 .request(proto::CancelCall {
285 room_id,
286 called_user_id,
287 })
288 .await?;
289 anyhow::Ok(())
290 })
291 }
292
293 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
294 self.incoming_call.1.clone()
295 }
296
297 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
298 if self.room.is_some() {
299 return Task::ready(Err(anyhow!("cannot join while on another call")));
300 }
301
302 let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
303 call
304 } else {
305 return Task::ready(Err(anyhow!("no incoming call")));
306 };
307
308 if self.pending_room_creation.is_some() {
309 return Task::ready(Ok(()));
310 }
311
312 let room_id = call.room_id;
313 let client = self.client.clone();
314 let user_store = self.user_store.clone();
315 let join = self
316 ._join_debouncer
317 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
318
319 cx.spawn(|this, mut cx| async move {
320 let room = join.await?;
321 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
322 .await?;
323 this.update(&mut cx, |this, cx| {
324 this.report_call_event("accept incoming", cx)
325 })?;
326 Ok(())
327 })
328 }
329
330 pub fn decline_incoming(&mut self, _: &mut ModelContext<Self>) -> Result<()> {
331 let call = self
332 .incoming_call
333 .0
334 .borrow_mut()
335 .take()
336 .ok_or_else(|| anyhow!("no incoming call"))?;
337 report_call_event_for_room("decline incoming", call.room_id, None, &self.client);
338 self.client.send(proto::DeclineCall {
339 room_id: call.room_id,
340 })?;
341 Ok(())
342 }
343
344 pub fn join_channel(
345 &mut self,
346 channel_id: ChannelId,
347 cx: &mut ModelContext<Self>,
348 ) -> Task<Result<Option<Model<Room>>>> {
349 if let Some(room) = self.room().cloned() {
350 if room.read(cx).channel_id() == Some(channel_id) {
351 return Task::ready(Ok(Some(room)));
352 } else {
353 room.update(cx, |room, cx| room.clear_state(cx));
354 }
355 }
356
357 if self.pending_room_creation.is_some() {
358 return Task::ready(Ok(None));
359 }
360
361 let client = self.client.clone();
362 let user_store = self.user_store.clone();
363 let join = self._join_debouncer.spawn(cx, move |cx| async move {
364 Room::join_channel(channel_id, client, user_store, cx).await
365 });
366
367 cx.spawn(|this, mut cx| async move {
368 let room = join.await?;
369 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
370 .await?;
371 this.update(&mut cx, |this, cx| {
372 this.report_call_event("join channel", cx)
373 })?;
374 Ok(room)
375 })
376 }
377
378 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
379 cx.notify();
380 self.report_call_event("hang up", cx);
381
382 Audio::end_call(cx);
383
384 let channel_id = self.channel_id(cx);
385 if let Some((room, _)) = self.room.take() {
386 cx.emit(Event::RoomLeft { channel_id });
387 room.update(cx, |room, cx| room.leave(cx))
388 } else {
389 Task::ready(Ok(()))
390 }
391 }
392
393 pub fn share_project(
394 &mut self,
395 project: Model<Project>,
396 cx: &mut ModelContext<Self>,
397 ) -> Task<Result<u64>> {
398 if let Some((room, _)) = self.room.as_ref() {
399 self.report_call_event("share project", cx);
400 room.update(cx, |room, cx| room.share_project(project, cx))
401 } else {
402 Task::ready(Err(anyhow!("no active call")))
403 }
404 }
405
406 pub fn unshare_project(
407 &mut self,
408 project: Model<Project>,
409 cx: &mut ModelContext<Self>,
410 ) -> Result<()> {
411 if let Some((room, _)) = self.room.as_ref() {
412 self.report_call_event("unshare project", cx);
413 room.update(cx, |room, cx| room.unshare_project(project, cx))
414 } else {
415 Err(anyhow!("no active call"))
416 }
417 }
418
419 pub fn location(&self) -> Option<&WeakModel<Project>> {
420 self.location.as_ref()
421 }
422
423 pub fn set_location(
424 &mut self,
425 project: Option<&Model<Project>>,
426 cx: &mut ModelContext<Self>,
427 ) -> Task<Result<()>> {
428 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
429 self.location = project.map(|project| project.downgrade());
430 if let Some((room, _)) = self.room.as_ref() {
431 return room.update(cx, |room, cx| room.set_location(project, cx));
432 }
433 }
434 Task::ready(Ok(()))
435 }
436
437 fn set_room(
438 &mut self,
439 room: Option<Model<Room>>,
440 cx: &mut ModelContext<Self>,
441 ) -> Task<Result<()>> {
442 if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
443 Task::ready(Ok(()))
444 } else {
445 cx.notify();
446 if let Some(room) = room {
447 if room.read(cx).status().is_offline() {
448 self.room = None;
449 Task::ready(Ok(()))
450 } else {
451 let subscriptions = vec![
452 cx.observe(&room, |this, room, cx| {
453 if room.read(cx).status().is_offline() {
454 this.set_room(None, cx).detach_and_log_err(cx);
455 }
456
457 cx.notify();
458 }),
459 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
460 ];
461 self.room = Some((room.clone(), subscriptions));
462 let location = self
463 .location
464 .as_ref()
465 .and_then(|location| location.upgrade());
466 let channel_id = room.read(cx).channel_id();
467 cx.emit(Event::RoomJoined { channel_id });
468 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
469 }
470 } else {
471 self.room = None;
472 Task::ready(Ok(()))
473 }
474 }
475 }
476
477 pub fn room(&self) -> Option<&Model<Room>> {
478 self.room.as_ref().map(|(room, _)| room)
479 }
480
481 pub fn client(&self) -> Arc<Client> {
482 self.client.clone()
483 }
484
485 pub fn pending_invites(&self) -> &HashSet<u64> {
486 &self.pending_invites
487 }
488
489 pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
490 if let Some(room) = self.room() {
491 let room = room.read(cx);
492 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client);
493 }
494 }
495}
496
497pub fn report_call_event_for_room(
498 operation: &'static str,
499 room_id: u64,
500 channel_id: Option<ChannelId>,
501 client: &Arc<Client>,
502) {
503 let telemetry = client.telemetry();
504
505 telemetry.report_call_event(operation, Some(room_id), channel_id)
506}
507
508pub fn report_call_event_for_channel(
509 operation: &'static str,
510 channel_id: ChannelId,
511 client: &Arc<Client>,
512 cx: &AppContext,
513) {
514 let room = ActiveCall::global(cx).read(cx).room();
515
516 let telemetry = client.telemetry();
517
518 telemetry.report_call_event(operation, room.map(|r| r.read(cx).id()), Some(channel_id))
519}
520
521#[cfg(test)]
522mod test {
523 use gpui::TestAppContext;
524
525 use crate::OneAtATime;
526
527 #[gpui::test]
528 async fn test_one_at_a_time(cx: &mut TestAppContext) {
529 let mut one_at_a_time = OneAtATime { cancel: None };
530
531 assert_eq!(
532 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
533 .await
534 .unwrap(),
535 Some(1)
536 );
537
538 let (a, b) = cx.update(|cx| {
539 (
540 one_at_a_time.spawn(cx, |_| async {
541 panic!("");
542 }),
543 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
544 )
545 });
546
547 assert_eq!(a.await.unwrap(), None::<u32>);
548 assert_eq!(b.await.unwrap(), Some(3));
549
550 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
551 drop(one_at_a_time);
552
553 assert_eq!(promise.await.unwrap(), None);
554 }
555}