1pub mod participant;
2pub mod room;
3
4use anyhow::{Context as _, Result, anyhow};
5use audio::Audio;
6use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
7use collections::HashSet;
8use futures::{Future, FutureExt, channel::oneshot, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, Subscription, Task,
11 WeakEntity,
12};
13use postage::watch;
14use project::Project;
15use room::Event;
16use std::sync::Arc;
17
18pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
19pub use participant::ParticipantLocation;
20pub use room::Room;
21
22struct GlobalActiveCall(Entity<ActiveCall>);
23
24impl Global for GlobalActiveCall {}
25
26pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
27 let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
28 cx.set_global(GlobalActiveCall(active_call));
29}
30
31pub struct OneAtATime {
32 cancel: Option<oneshot::Sender<()>>,
33}
34
35impl OneAtATime {
36 /// spawn a task in the given context.
37 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
38 /// otherwise you'll see the result of the task.
39 fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
40 where
41 F: 'static + FnOnce(AsyncApp) -> Fut,
42 Fut: Future<Output = Result<R>>,
43 R: 'static,
44 {
45 let (tx, rx) = oneshot::channel();
46 self.cancel.replace(tx);
47 cx.spawn(async move |cx| {
48 futures::select_biased! {
49 _ = rx.fuse() => Ok(None),
50 result = f(cx.clone()).fuse() => result.map(Some),
51 }
52 })
53 }
54
55 fn running(&self) -> bool {
56 self.cancel
57 .as_ref()
58 .is_some_and(|cancel| !cancel.is_canceled())
59 }
60}
61
62#[derive(Clone)]
63pub struct IncomingCall {
64 pub room_id: u64,
65 pub calling_user: Arc<User>,
66 pub participants: Vec<Arc<User>>,
67 pub initial_project: Option<proto::ParticipantProject>,
68}
69
70/// Singleton global maintaining the user's participation in a room across workspaces.
71pub struct ActiveCall {
72 room: Option<(Entity<Room>, Vec<Subscription>)>,
73 pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
74 location: Option<WeakEntity<Project>>,
75 _join_debouncer: OneAtATime,
76 pending_invites: HashSet<u64>,
77 incoming_call: (
78 watch::Sender<Option<IncomingCall>>,
79 watch::Receiver<Option<IncomingCall>>,
80 ),
81 client: Arc<Client>,
82 user_store: Entity<UserStore>,
83 _subscriptions: Vec<client::Subscription>,
84}
85
86impl EventEmitter<Event> for ActiveCall {}
87
88impl ActiveCall {
89 fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
90 Self {
91 room: None,
92 pending_room_creation: None,
93 location: None,
94 pending_invites: Default::default(),
95 incoming_call: watch::channel(),
96 _join_debouncer: OneAtATime { cancel: None },
97 _subscriptions: vec![
98 client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
99 client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
100 ],
101 client,
102 user_store,
103 }
104 }
105
106 pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
107 self.room()?.read(cx).channel_id()
108 }
109
110 async fn handle_incoming_call(
111 this: Entity<Self>,
112 envelope: TypedEnvelope<proto::IncomingCall>,
113 mut cx: AsyncApp,
114 ) -> Result<proto::Ack> {
115 let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
116 let call = IncomingCall {
117 room_id: envelope.payload.room_id,
118 participants: user_store
119 .update(&mut cx, |user_store, cx| {
120 user_store.get_users(envelope.payload.participant_user_ids, cx)
121 })?
122 .await?,
123 calling_user: user_store
124 .update(&mut cx, |user_store, cx| {
125 user_store.get_user(envelope.payload.calling_user_id, cx)
126 })?
127 .await?,
128 initial_project: envelope.payload.initial_project,
129 };
130 this.update(&mut cx, |this, _| {
131 *this.incoming_call.0.borrow_mut() = Some(call);
132 })?;
133
134 Ok(proto::Ack {})
135 }
136
137 async fn handle_call_canceled(
138 this: Entity<Self>,
139 envelope: TypedEnvelope<proto::CallCanceled>,
140 mut cx: AsyncApp,
141 ) -> Result<()> {
142 this.update(&mut cx, |this, _| {
143 let mut incoming_call = this.incoming_call.0.borrow_mut();
144 if incoming_call
145 .as_ref()
146 .is_some_and(|call| call.room_id == envelope.payload.room_id)
147 {
148 incoming_call.take();
149 }
150 })?;
151 Ok(())
152 }
153
154 pub fn global(cx: &App) -> Entity<Self> {
155 cx.global::<GlobalActiveCall>().0.clone()
156 }
157
158 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
159 cx.try_global::<GlobalActiveCall>()
160 .map(|call| call.0.clone())
161 }
162
163 pub fn invite(
164 &mut self,
165 called_user_id: u64,
166 initial_project: Option<Entity<Project>>,
167 cx: &mut Context<Self>,
168 ) -> Task<Result<()>> {
169 if !self.pending_invites.insert(called_user_id) {
170 return Task::ready(Err(anyhow!("user was already invited")));
171 }
172 cx.notify();
173
174 if self._join_debouncer.running() {
175 return Task::ready(Ok(()));
176 }
177
178 let room = if let Some(room) = self.room().cloned() {
179 Some(Task::ready(Ok(room)).shared())
180 } else {
181 self.pending_room_creation.clone()
182 };
183
184 let invite = if let Some(room) = room {
185 cx.spawn(async move |_, cx| {
186 let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
187
188 let initial_project_id = if let Some(initial_project) = initial_project {
189 Some(
190 room.update(cx, |room, cx| room.share_project(initial_project, cx))?
191 .await?,
192 )
193 } else {
194 None
195 };
196
197 room.update(cx, move |room, cx| {
198 room.call(called_user_id, initial_project_id, cx)
199 })?
200 .await?;
201
202 anyhow::Ok(())
203 })
204 } else {
205 let client = self.client.clone();
206 let user_store = self.user_store.clone();
207 let room = cx
208 .spawn(async move |this, cx| {
209 let create_room = async {
210 let room = cx
211 .update(|cx| {
212 Room::create(
213 called_user_id,
214 initial_project,
215 client,
216 user_store,
217 cx,
218 )
219 })?
220 .await?;
221
222 this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
223 .await?;
224
225 anyhow::Ok(room)
226 };
227
228 let room = create_room.await;
229 this.update(cx, |this, _| this.pending_room_creation = None)?;
230 room.map_err(Arc::new)
231 })
232 .shared();
233 self.pending_room_creation = Some(room.clone());
234 cx.background_spawn(async move {
235 room.await.map_err(|err| anyhow!("{err:?}"))?;
236 anyhow::Ok(())
237 })
238 };
239
240 cx.spawn(async move |this, cx| {
241 let result = invite.await;
242 if result.is_ok() {
243 this.update(cx, |this, cx| {
244 this.report_call_event("Participant Invited", cx)
245 })?;
246 } else {
247 //TODO: report collaboration error
248 log::error!("invite failed: {:?}", result);
249 }
250
251 this.update(cx, |this, cx| {
252 this.pending_invites.remove(&called_user_id);
253 cx.notify();
254 })?;
255 result
256 })
257 }
258
259 pub fn cancel_invite(
260 &mut self,
261 called_user_id: u64,
262 cx: &mut Context<Self>,
263 ) -> Task<Result<()>> {
264 let room_id = if let Some(room) = self.room() {
265 room.read(cx).id()
266 } else {
267 return Task::ready(Err(anyhow!("no active call")));
268 };
269
270 let client = self.client.clone();
271 cx.background_spawn(async move {
272 client
273 .request(proto::CancelCall {
274 room_id,
275 called_user_id,
276 })
277 .await?;
278 anyhow::Ok(())
279 })
280 }
281
282 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
283 self.incoming_call.1.clone()
284 }
285
286 pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
287 if self.room.is_some() {
288 return Task::ready(Err(anyhow!("cannot join while on another call")));
289 }
290
291 let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
292 call
293 } else {
294 return Task::ready(Err(anyhow!("no incoming call")));
295 };
296
297 if self.pending_room_creation.is_some() {
298 return Task::ready(Ok(()));
299 }
300
301 let room_id = call.room_id;
302 let client = self.client.clone();
303 let user_store = self.user_store.clone();
304 let join = self
305 ._join_debouncer
306 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
307
308 cx.spawn(async move |this, cx| {
309 let room = join.await?;
310 this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
311 .await?;
312 this.update(cx, |this, cx| {
313 this.report_call_event("Incoming Call Accepted", cx)
314 })?;
315 Ok(())
316 })
317 }
318
319 pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
320 let call = self
321 .incoming_call
322 .0
323 .borrow_mut()
324 .take()
325 .context("no incoming call")?;
326 telemetry::event!("Incoming Call Declined", room_id = call.room_id);
327 self.client.send(proto::DeclineCall {
328 room_id: call.room_id,
329 })?;
330 Ok(())
331 }
332
333 pub fn join_channel(
334 &mut self,
335 channel_id: ChannelId,
336 cx: &mut Context<Self>,
337 ) -> Task<Result<Option<Entity<Room>>>> {
338 if let Some(room) = self.room().cloned() {
339 if room.read(cx).channel_id() == Some(channel_id) {
340 return Task::ready(Ok(Some(room)));
341 } else {
342 room.update(cx, |room, cx| room.clear_state(cx));
343 }
344 }
345
346 if self.pending_room_creation.is_some() {
347 return Task::ready(Ok(None));
348 }
349
350 let client = self.client.clone();
351 let user_store = self.user_store.clone();
352 let join = self._join_debouncer.spawn(cx, move |cx| async move {
353 Room::join_channel(channel_id, client, user_store, cx).await
354 });
355
356 cx.spawn(async move |this, cx| {
357 let room = join.await?;
358 this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
359 .await?;
360 this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
361 Ok(room)
362 })
363 }
364
365 pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
366 cx.notify();
367 self.report_call_event("Call Ended", cx);
368
369 Audio::end_call(cx);
370
371 let channel_id = self.channel_id(cx);
372 if let Some((room, _)) = self.room.take() {
373 cx.emit(Event::RoomLeft { channel_id });
374 room.update(cx, |room, cx| room.leave(cx))
375 } else {
376 Task::ready(Ok(()))
377 }
378 }
379
380 pub fn share_project(
381 &mut self,
382 project: Entity<Project>,
383 cx: &mut Context<Self>,
384 ) -> Task<Result<u64>> {
385 if let Some((room, _)) = self.room.as_ref() {
386 self.report_call_event("Project Shared", cx);
387 room.update(cx, |room, cx| room.share_project(project, cx))
388 } else {
389 Task::ready(Err(anyhow!("no active call")))
390 }
391 }
392
393 pub fn unshare_project(
394 &mut self,
395 project: Entity<Project>,
396 cx: &mut Context<Self>,
397 ) -> Result<()> {
398 let (room, _) = self.room.as_ref().context("no active call")?;
399 self.report_call_event("Project Unshared", cx);
400 room.update(cx, |room, cx| room.unshare_project(project, cx))
401 }
402
403 pub fn location(&self) -> Option<&WeakEntity<Project>> {
404 self.location.as_ref()
405 }
406
407 pub fn set_location(
408 &mut self,
409 project: Option<&Entity<Project>>,
410 cx: &mut Context<Self>,
411 ) -> Task<Result<()>> {
412 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
413 self.location = project.map(|project| project.downgrade());
414 if let Some((room, _)) = self.room.as_ref() {
415 return room.update(cx, |room, cx| room.set_location(project, cx));
416 }
417 }
418 Task::ready(Ok(()))
419 }
420
421 fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
422 if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
423 Task::ready(Ok(()))
424 } else {
425 cx.notify();
426 if let Some(room) = room {
427 if room.read(cx).status().is_offline() {
428 self.room = None;
429 Task::ready(Ok(()))
430 } else {
431 let subscriptions = vec![
432 cx.observe(&room, |this, room, cx| {
433 if room.read(cx).status().is_offline() {
434 this.set_room(None, cx).detach_and_log_err(cx);
435 }
436
437 cx.notify();
438 }),
439 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
440 ];
441 self.room = Some((room.clone(), subscriptions));
442 let location = self
443 .location
444 .as_ref()
445 .and_then(|location| location.upgrade());
446 let channel_id = room.read(cx).channel_id();
447 cx.emit(Event::RoomJoined { channel_id });
448 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
449 }
450 } else {
451 self.room = None;
452 Task::ready(Ok(()))
453 }
454 }
455 }
456
457 pub fn room(&self) -> Option<&Entity<Room>> {
458 self.room.as_ref().map(|(room, _)| room)
459 }
460
461 pub fn client(&self) -> Arc<Client> {
462 self.client.clone()
463 }
464
465 pub fn pending_invites(&self) -> &HashSet<u64> {
466 &self.pending_invites
467 }
468
469 pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
470 if let Some(room) = self.room() {
471 let room = room.read(cx);
472 telemetry::event!(
473 operation,
474 room_id = room.id(),
475 channel_id = room.channel_id()
476 )
477 }
478 }
479}
480
481#[cfg(test)]
482mod test {
483 use gpui::TestAppContext;
484
485 use crate::OneAtATime;
486
487 #[gpui::test]
488 async fn test_one_at_a_time(cx: &mut TestAppContext) {
489 let mut one_at_a_time = OneAtATime { cancel: None };
490
491 assert_eq!(
492 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
493 .await
494 .unwrap(),
495 Some(1)
496 );
497
498 let (a, b) = cx.update(|cx| {
499 (
500 one_at_a_time.spawn(cx, |_| async {
501 panic!("");
502 }),
503 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
504 )
505 });
506
507 assert_eq!(a.await.unwrap(), None::<u32>);
508 assert_eq!(b.await.unwrap(), Some(3));
509
510 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
511 drop(one_at_a_time);
512
513 assert_eq!(promise.await.unwrap(), None);
514 }
515}