1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio2::Audio;
7use call_settings::CallSettings;
8use client2::{
9 proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
10 ZED_ALWAYS_ACTIVE,
11};
12use collections::HashSet;
13use futures::{future::Shared, FutureExt};
14use gpui2::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Handle, ModelContext, Subscription, Task,
16 WeakHandle,
17};
18use postage::watch;
19use project2::Project;
20use settings2::Settings;
21use std::sync::Arc;
22
23pub use participant::ParticipantLocation;
24pub use room::Room;
25
26pub fn init(client: Arc<Client>, user_store: Handle<UserStore>, cx: &mut AppContext) {
27 CallSettings::register(cx);
28
29 let active_call = cx.entity(|cx| ActiveCall::new(client, user_store, cx));
30 cx.set_global(active_call);
31}
32
33#[derive(Clone)]
34pub struct IncomingCall {
35 pub room_id: u64,
36 pub calling_user: Arc<User>,
37 pub participants: Vec<Arc<User>>,
38 pub initial_project: Option<proto::ParticipantProject>,
39}
40
41/// Singleton global maintaining the user's participation in a room across workspaces.
42pub struct ActiveCall {
43 room: Option<(Handle<Room>, Vec<Subscription>)>,
44 pending_room_creation: Option<Shared<Task<Result<Handle<Room>, Arc<anyhow::Error>>>>>,
45 location: Option<WeakHandle<Project>>,
46 pending_invites: HashSet<u64>,
47 incoming_call: (
48 watch::Sender<Option<IncomingCall>>,
49 watch::Receiver<Option<IncomingCall>>,
50 ),
51 client: Arc<Client>,
52 user_store: Handle<UserStore>,
53 _subscriptions: Vec<client2::Subscription>,
54}
55
56impl EventEmitter for ActiveCall {
57 type Event = room::Event;
58}
59
60impl ActiveCall {
61 fn new(
62 client: Arc<Client>,
63 user_store: Handle<UserStore>,
64 cx: &mut ModelContext<Self>,
65 ) -> Self {
66 Self {
67 room: None,
68 pending_room_creation: None,
69 location: None,
70 pending_invites: Default::default(),
71 incoming_call: watch::channel(),
72
73 _subscriptions: vec![
74 client.add_request_handler(cx.weak_handle(), Self::handle_incoming_call),
75 client.add_message_handler(cx.weak_handle(), Self::handle_call_canceled),
76 ],
77 client,
78 user_store,
79 }
80 }
81
82 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
83 self.room()?.read(cx).channel_id()
84 }
85
86 async fn handle_incoming_call(
87 this: Handle<Self>,
88 envelope: TypedEnvelope<proto::IncomingCall>,
89 _: Arc<Client>,
90 mut cx: AsyncAppContext,
91 ) -> Result<proto::Ack> {
92 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
93 let call = IncomingCall {
94 room_id: envelope.payload.room_id,
95 participants: user_store
96 .update(&mut cx, |user_store, cx| {
97 user_store.get_users(envelope.payload.participant_user_ids, cx)
98 })?
99 .await?,
100 calling_user: user_store
101 .update(&mut cx, |user_store, cx| {
102 user_store.get_user(envelope.payload.calling_user_id, cx)
103 })?
104 .await?,
105 initial_project: envelope.payload.initial_project,
106 };
107 this.update(&mut cx, |this, _| {
108 *this.incoming_call.0.borrow_mut() = Some(call);
109 })?;
110
111 Ok(proto::Ack {})
112 }
113
114 async fn handle_call_canceled(
115 this: Handle<Self>,
116 envelope: TypedEnvelope<proto::CallCanceled>,
117 _: Arc<Client>,
118 mut cx: AsyncAppContext,
119 ) -> Result<()> {
120 this.update(&mut cx, |this, _| {
121 let mut incoming_call = this.incoming_call.0.borrow_mut();
122 if incoming_call
123 .as_ref()
124 .map_or(false, |call| call.room_id == envelope.payload.room_id)
125 {
126 incoming_call.take();
127 }
128 })?;
129 Ok(())
130 }
131
132 pub fn global(cx: &AppContext) -> Handle<Self> {
133 cx.global::<Handle<Self>>().clone()
134 }
135
136 pub fn invite(
137 &mut self,
138 called_user_id: u64,
139 initial_project: Option<Handle<Project>>,
140 cx: &mut ModelContext<Self>,
141 ) -> Task<Result<()>> {
142 if !self.pending_invites.insert(called_user_id) {
143 return Task::ready(Err(anyhow!("user was already invited")));
144 }
145 cx.notify();
146
147 let room = if let Some(room) = self.room().cloned() {
148 Some(Task::ready(Ok(room)).shared())
149 } else {
150 self.pending_room_creation.clone()
151 };
152
153 let invite = if let Some(room) = room {
154 cx.spawn(move |_, mut cx| async move {
155 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
156
157 let initial_project_id = if let Some(initial_project) = initial_project {
158 Some(
159 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
160 .await?,
161 )
162 } else {
163 None
164 };
165
166 room.update(&mut cx, move |room, cx| {
167 room.call(called_user_id, initial_project_id, cx)
168 })?
169 .await?;
170
171 anyhow::Ok(())
172 })
173 } else {
174 let client = self.client.clone();
175 let user_store = self.user_store.clone();
176 let room = cx
177 .spawn(move |this, mut cx| async move {
178 let create_room = async {
179 let room = cx
180 .update(|cx| {
181 Room::create(
182 called_user_id,
183 initial_project,
184 client,
185 user_store,
186 cx,
187 )
188 })?
189 .await?;
190
191 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
192 .await?;
193
194 anyhow::Ok(room)
195 };
196
197 let room = create_room.await;
198 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
199 room.map_err(Arc::new)
200 })
201 .shared();
202 self.pending_room_creation = Some(room.clone());
203 cx.executor().spawn(async move {
204 room.await.map_err(|err| anyhow!("{:?}", err))?;
205 anyhow::Ok(())
206 })
207 };
208
209 cx.spawn(move |this, mut cx| async move {
210 let result = invite.await;
211 if result.is_ok() {
212 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
213 } else {
214 // TODO: Resport collaboration error
215 }
216
217 this.update(&mut cx, |this, cx| {
218 this.pending_invites.remove(&called_user_id);
219 cx.notify();
220 })?;
221 result
222 })
223 }
224
225 pub fn cancel_invite(
226 &mut self,
227 called_user_id: u64,
228 cx: &mut ModelContext<Self>,
229 ) -> Task<Result<()>> {
230 let room_id = if let Some(room) = self.room() {
231 room.read(cx).id()
232 } else {
233 return Task::ready(Err(anyhow!("no active call")));
234 };
235
236 let client = self.client.clone();
237 cx.executor().spawn(async move {
238 client
239 .request(proto::CancelCall {
240 room_id,
241 called_user_id,
242 })
243 .await?;
244 anyhow::Ok(())
245 })
246 }
247
248 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
249 self.incoming_call.1.clone()
250 }
251
252 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
253 if self.room.is_some() {
254 return Task::ready(Err(anyhow!("cannot join while on another call")));
255 }
256
257 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
258 call
259 } else {
260 return Task::ready(Err(anyhow!("no incoming call")));
261 };
262
263 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
264
265 cx.spawn(|this, mut cx| async move {
266 let room = join.await?;
267 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
268 .await?;
269 this.update(&mut cx, |this, cx| {
270 this.report_call_event("accept incoming", cx)
271 })?;
272 Ok(())
273 })
274 }
275
276 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
277 let call = self
278 .incoming_call
279 .0
280 .borrow_mut()
281 .take()
282 .ok_or_else(|| anyhow!("no incoming call"))?;
283 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
284 self.client.send(proto::DeclineCall {
285 room_id: call.room_id,
286 })?;
287 Ok(())
288 }
289
290 pub fn join_channel(
291 &mut self,
292 channel_id: u64,
293 cx: &mut ModelContext<Self>,
294 ) -> Task<Result<Handle<Room>>> {
295 if let Some(room) = self.room().cloned() {
296 if room.read(cx).channel_id() == Some(channel_id) {
297 return Task::ready(Ok(room));
298 } else {
299 room.update(cx, |room, cx| room.clear_state(cx));
300 }
301 }
302
303 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
304
305 cx.spawn(|this, mut cx| async move {
306 let room = join.await?;
307 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
308 .await?;
309 this.update(&mut cx, |this, cx| {
310 this.report_call_event("join channel", cx)
311 })?;
312 Ok(room)
313 })
314 }
315
316 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
317 cx.notify();
318 self.report_call_event("hang up", cx);
319
320 Audio::end_call(cx);
321 if let Some((room, _)) = self.room.take() {
322 room.update(cx, |room, cx| room.leave(cx))
323 } else {
324 Task::ready(Ok(()))
325 }
326 }
327
328 pub fn share_project(
329 &mut self,
330 project: Handle<Project>,
331 cx: &mut ModelContext<Self>,
332 ) -> Task<Result<u64>> {
333 if let Some((room, _)) = self.room.as_ref() {
334 self.report_call_event("share project", cx);
335 room.update(cx, |room, cx| room.share_project(project, cx))
336 } else {
337 Task::ready(Err(anyhow!("no active call")))
338 }
339 }
340
341 pub fn unshare_project(
342 &mut self,
343 project: Handle<Project>,
344 cx: &mut ModelContext<Self>,
345 ) -> Result<()> {
346 if let Some((room, _)) = self.room.as_ref() {
347 self.report_call_event("unshare project", cx);
348 room.update(cx, |room, cx| room.unshare_project(project, cx))
349 } else {
350 Err(anyhow!("no active call"))
351 }
352 }
353
354 pub fn location(&self) -> Option<&WeakHandle<Project>> {
355 self.location.as_ref()
356 }
357
358 pub fn set_location(
359 &mut self,
360 project: Option<&Handle<Project>>,
361 cx: &mut ModelContext<Self>,
362 ) -> Task<Result<()>> {
363 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
364 self.location = project.map(|project| project.downgrade());
365 if let Some((room, _)) = self.room.as_ref() {
366 return room.update(cx, |room, cx| room.set_location(project, cx));
367 }
368 }
369 Task::ready(Ok(()))
370 }
371
372 fn set_room(
373 &mut self,
374 room: Option<Handle<Room>>,
375 cx: &mut ModelContext<Self>,
376 ) -> Task<Result<()>> {
377 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
378 cx.notify();
379 if let Some(room) = room {
380 if room.read(cx).status().is_offline() {
381 self.room = None;
382 Task::ready(Ok(()))
383 } else {
384 let subscriptions = vec![
385 cx.observe(&room, |this, room, cx| {
386 if room.read(cx).status().is_offline() {
387 this.set_room(None, cx).detach_and_log_err(cx);
388 }
389
390 cx.notify();
391 }),
392 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
393 ];
394 self.room = Some((room.clone(), subscriptions));
395 let location = self
396 .location
397 .as_ref()
398 .and_then(|location| location.upgrade());
399 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
400 }
401 } else {
402 self.room = None;
403 Task::ready(Ok(()))
404 }
405 } else {
406 Task::ready(Ok(()))
407 }
408 }
409
410 pub fn room(&self) -> Option<&Handle<Room>> {
411 self.room.as_ref().map(|(room, _)| room)
412 }
413
414 pub fn client(&self) -> Arc<Client> {
415 self.client.clone()
416 }
417
418 pub fn pending_invites(&self) -> &HashSet<u64> {
419 &self.pending_invites
420 }
421
422 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
423 if let Some(room) = self.room() {
424 let room = room.read(cx);
425 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
426 }
427 }
428}
429
430pub fn report_call_event_for_room(
431 operation: &'static str,
432 room_id: u64,
433 channel_id: Option<u64>,
434 client: &Arc<Client>,
435 cx: &AppContext,
436) {
437 let telemetry = client.telemetry();
438 let telemetry_settings = *TelemetrySettings::get_global(cx);
439 let event = ClickhouseEvent::Call {
440 operation,
441 room_id: Some(room_id),
442 channel_id,
443 };
444 telemetry.report_clickhouse_event(event, telemetry_settings);
445}
446
447pub fn report_call_event_for_channel(
448 operation: &'static str,
449 channel_id: u64,
450 client: &Arc<Client>,
451 cx: &AppContext,
452) {
453 let room = ActiveCall::global(cx).read(cx).room();
454
455 let telemetry = client.telemetry();
456
457 let telemetry_settings = *TelemetrySettings::get_global(cx);
458
459 let event = ClickhouseEvent::Call {
460 operation,
461 room_id: room.map(|r| r.read(cx).id()),
462 channel_id: Some(channel_id),
463 };
464 telemetry.report_clickhouse_event(event, telemetry_settings);
465}