1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use client::{
9 proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
10 ZED_ALWAYS_ACTIVE,
11};
12use collections::HashSet;
13use futures::{future::Shared, FutureExt};
14use gpui::{
15 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
16 WeakModelHandle,
17};
18use postage::watch;
19use project::Project;
20use std::sync::Arc;
21
22pub use participant::ParticipantLocation;
23pub use room::Room;
24
25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
26 settings::register::<CallSettings>(cx);
27
28 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
29 cx.set_global(active_call);
30}
31
32#[derive(Clone)]
33pub struct IncomingCall {
34 pub room_id: u64,
35 pub calling_user: Arc<User>,
36 pub participants: Vec<Arc<User>>,
37 pub initial_project: Option<proto::ParticipantProject>,
38}
39
40/// Singleton global maintaining the user's participation in a room across workspaces.
41pub struct ActiveCall {
42 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
43 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
44 location: Option<WeakModelHandle<Project>>,
45 pending_invites: HashSet<u64>,
46 incoming_call: (
47 watch::Sender<Option<IncomingCall>>,
48 watch::Receiver<Option<IncomingCall>>,
49 ),
50 client: Arc<Client>,
51 user_store: ModelHandle<UserStore>,
52 _subscriptions: Vec<client::Subscription>,
53}
54
55impl Entity for ActiveCall {
56 type Event = room::Event;
57}
58
59impl ActiveCall {
60 fn new(
61 client: Arc<Client>,
62 user_store: ModelHandle<UserStore>,
63 cx: &mut ModelContext<Self>,
64 ) -> Self {
65 Self {
66 room: None,
67 pending_room_creation: None,
68 location: None,
69 pending_invites: Default::default(),
70 incoming_call: watch::channel(),
71
72 _subscriptions: vec![
73 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
74 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
75 ],
76 client,
77 user_store,
78 }
79 }
80
81 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
82 self.room()?.read(cx).channel_id()
83 }
84
85 async fn handle_incoming_call(
86 this: ModelHandle<Self>,
87 envelope: TypedEnvelope<proto::IncomingCall>,
88 _: Arc<Client>,
89 mut cx: AsyncAppContext,
90 ) -> Result<proto::Ack> {
91 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
92 let call = IncomingCall {
93 room_id: envelope.payload.room_id,
94 participants: user_store
95 .update(&mut cx, |user_store, cx| {
96 user_store.get_users(envelope.payload.participant_user_ids, cx)
97 })
98 .await?,
99 calling_user: user_store
100 .update(&mut cx, |user_store, cx| {
101 user_store.get_user(envelope.payload.calling_user_id, cx)
102 })
103 .await?,
104 initial_project: envelope.payload.initial_project,
105 };
106 this.update(&mut cx, |this, _| {
107 *this.incoming_call.0.borrow_mut() = Some(call);
108 });
109
110 Ok(proto::Ack {})
111 }
112
113 async fn handle_call_canceled(
114 this: ModelHandle<Self>,
115 envelope: TypedEnvelope<proto::CallCanceled>,
116 _: Arc<Client>,
117 mut cx: AsyncAppContext,
118 ) -> Result<()> {
119 this.update(&mut cx, |this, _| {
120 let mut incoming_call = this.incoming_call.0.borrow_mut();
121 if incoming_call
122 .as_ref()
123 .map_or(false, |call| call.room_id == envelope.payload.room_id)
124 {
125 incoming_call.take();
126 }
127 });
128 Ok(())
129 }
130
131 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
132 cx.global::<ModelHandle<Self>>().clone()
133 }
134
135 pub fn invite(
136 &mut self,
137 called_user_id: u64,
138 initial_project: Option<ModelHandle<Project>>,
139 cx: &mut ModelContext<Self>,
140 ) -> Task<Result<()>> {
141 if !self.pending_invites.insert(called_user_id) {
142 return Task::ready(Err(anyhow!("user was already invited")));
143 }
144 cx.notify();
145
146 let room = if let Some(room) = self.room().cloned() {
147 Some(Task::ready(Ok(room)).shared())
148 } else {
149 self.pending_room_creation.clone()
150 };
151
152 let invite = if let Some(room) = room {
153 cx.spawn_weak(|_, mut cx| async move {
154 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
155
156 let initial_project_id = if let Some(initial_project) = initial_project {
157 Some(
158 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
159 .await?,
160 )
161 } else {
162 None
163 };
164
165 room.update(&mut cx, |room, cx| {
166 room.call(called_user_id, initial_project_id, cx)
167 })
168 .await?;
169
170 anyhow::Ok(())
171 })
172 } else {
173 let client = self.client.clone();
174 let user_store = self.user_store.clone();
175 let room = cx
176 .spawn(|this, mut cx| async move {
177 let create_room = async {
178 let room = cx
179 .update(|cx| {
180 Room::create(
181 called_user_id,
182 initial_project,
183 client,
184 user_store,
185 cx,
186 )
187 })
188 .await?;
189
190 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
191 .await?;
192
193 anyhow::Ok(room)
194 };
195
196 let room = create_room.await;
197 this.update(&mut cx, |this, _| this.pending_room_creation = None);
198 room.map_err(Arc::new)
199 })
200 .shared();
201 self.pending_room_creation = Some(room.clone());
202 cx.foreground().spawn(async move {
203 room.await.map_err(|err| anyhow!("{:?}", err))?;
204 anyhow::Ok(())
205 })
206 };
207
208 cx.spawn(|this, mut cx| async move {
209 let result = invite.await;
210 if result.is_ok() {
211 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
212 } else {
213 // TODO: Resport collaboration error
214 }
215
216 this.update(&mut cx, |this, cx| {
217 this.pending_invites.remove(&called_user_id);
218 cx.notify();
219 });
220 result
221 })
222 }
223
224 pub fn cancel_invite(
225 &mut self,
226 called_user_id: u64,
227 cx: &mut ModelContext<Self>,
228 ) -> Task<Result<()>> {
229 let room_id = if let Some(room) = self.room() {
230 room.read(cx).id()
231 } else {
232 return Task::ready(Err(anyhow!("no active call")));
233 };
234
235 let client = self.client.clone();
236 cx.foreground().spawn(async move {
237 client
238 .request(proto::CancelCall {
239 room_id,
240 called_user_id,
241 })
242 .await?;
243 anyhow::Ok(())
244 })
245 }
246
247 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
248 self.incoming_call.1.clone()
249 }
250
251 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
252 if self.room.is_some() {
253 return Task::ready(Err(anyhow!("cannot join while on another call")));
254 }
255
256 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
257 call
258 } else {
259 return Task::ready(Err(anyhow!("no incoming call")));
260 };
261
262 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
263
264 cx.spawn(|this, mut cx| async move {
265 let room = join.await?;
266 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
267 .await?;
268 this.update(&mut cx, |this, cx| {
269 this.report_call_event("accept incoming", cx)
270 });
271 Ok(())
272 })
273 }
274
275 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
276 let call = self
277 .incoming_call
278 .0
279 .borrow_mut()
280 .take()
281 .ok_or_else(|| anyhow!("no incoming call"))?;
282 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
283 self.client.send(proto::DeclineCall {
284 room_id: call.room_id,
285 })?;
286 Ok(())
287 }
288
289 pub fn join_channel(
290 &mut self,
291 channel_id: u64,
292 cx: &mut ModelContext<Self>,
293 ) -> Task<Result<ModelHandle<Room>>> {
294 if let Some(room) = self.room().cloned() {
295 if room.read(cx).channel_id() == Some(channel_id) {
296 return Task::ready(Ok(room));
297 } else {
298 room.update(cx, |room, cx| room.clear_state(cx));
299 }
300 }
301
302 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
303
304 cx.spawn(|this, mut cx| async move {
305 let room = join.await?;
306 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
307 .await?;
308 this.update(&mut cx, |this, cx| {
309 this.report_call_event("join channel", cx)
310 });
311 Ok(room)
312 })
313 }
314
315 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
316 cx.notify();
317 self.report_call_event("hang up", cx);
318 Audio::end_call(cx);
319 if let Some((room, _)) = self.room.take() {
320 room.update(cx, |room, cx| room.leave(cx))
321 } else {
322 Task::ready(Ok(()))
323 }
324 }
325
326 pub fn share_project(
327 &mut self,
328 project: ModelHandle<Project>,
329 cx: &mut ModelContext<Self>,
330 ) -> Task<Result<u64>> {
331 if let Some((room, _)) = self.room.as_ref() {
332 self.report_call_event("share project", cx);
333 room.update(cx, |room, cx| room.share_project(project, cx))
334 } else {
335 Task::ready(Err(anyhow!("no active call")))
336 }
337 }
338
339 pub fn unshare_project(
340 &mut self,
341 project: ModelHandle<Project>,
342 cx: &mut ModelContext<Self>,
343 ) -> Result<()> {
344 if let Some((room, _)) = self.room.as_ref() {
345 self.report_call_event("unshare project", cx);
346 room.update(cx, |room, cx| room.unshare_project(project, cx))
347 } else {
348 Err(anyhow!("no active call"))
349 }
350 }
351
352 pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
353 self.location.as_ref()
354 }
355
356 pub fn set_location(
357 &mut self,
358 project: Option<&ModelHandle<Project>>,
359 cx: &mut ModelContext<Self>,
360 ) -> Task<Result<()>> {
361 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
362 self.location = project.map(|project| project.downgrade());
363 if let Some((room, _)) = self.room.as_ref() {
364 return room.update(cx, |room, cx| room.set_location(project, cx));
365 }
366 }
367 Task::ready(Ok(()))
368 }
369
370 fn set_room(
371 &mut self,
372 room: Option<ModelHandle<Room>>,
373 cx: &mut ModelContext<Self>,
374 ) -> Task<Result<()>> {
375 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
376 cx.notify();
377 if let Some(room) = room {
378 if room.read(cx).status().is_offline() {
379 self.room = None;
380 Task::ready(Ok(()))
381 } else {
382 let subscriptions = vec![
383 cx.observe(&room, |this, room, cx| {
384 if room.read(cx).status().is_offline() {
385 this.set_room(None, cx).detach_and_log_err(cx);
386 }
387
388 cx.notify();
389 }),
390 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
391 ];
392 self.room = Some((room.clone(), subscriptions));
393 let location = self.location.and_then(|location| location.upgrade(cx));
394 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
395 }
396 } else {
397 self.room = None;
398 Task::ready(Ok(()))
399 }
400 } else {
401 Task::ready(Ok(()))
402 }
403 }
404
405 pub fn room(&self) -> Option<&ModelHandle<Room>> {
406 self.room.as_ref().map(|(room, _)| room)
407 }
408
409 pub fn client(&self) -> Arc<Client> {
410 self.client.clone()
411 }
412
413 pub fn pending_invites(&self) -> &HashSet<u64> {
414 &self.pending_invites
415 }
416
417 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
418 if let Some(room) = self.room() {
419 let room = room.read(cx);
420 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
421 }
422 }
423}
424
425pub fn report_call_event_for_room(
426 operation: &'static str,
427 room_id: u64,
428 channel_id: Option<u64>,
429 client: &Arc<Client>,
430 cx: &AppContext,
431) {
432 let telemetry = client.telemetry();
433 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
434 let event = ClickhouseEvent::Call {
435 operation,
436 room_id: Some(room_id),
437 channel_id,
438 };
439 telemetry.report_clickhouse_event(event, telemetry_settings);
440}
441
442pub fn report_call_event_for_channel(
443 operation: &'static str,
444 channel_id: u64,
445 client: &Arc<Client>,
446 cx: &AppContext,
447) {
448 let room = ActiveCall::global(cx).read(cx).room();
449
450 let telemetry = client.telemetry();
451 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
452
453 let event = ClickhouseEvent::Call {
454 operation,
455 room_id: room.map(|r| r.read(cx).id()),
456 channel_id: Some(channel_id),
457 };
458 telemetry.report_clickhouse_event(event, telemetry_settings);
459}