1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use channel::ChannelId;
9use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
10use collections::HashSet;
11use futures::{future::Shared, FutureExt};
12use gpui::{
13 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
14 WeakModelHandle,
15};
16use postage::watch;
17use project::Project;
18use std::sync::Arc;
19
20pub use participant::ParticipantLocation;
21pub use room::Room;
22
23pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
24 settings::register::<CallSettings>(cx);
25
26 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
27 cx.set_global(active_call);
28}
29
30#[derive(Clone)]
31pub struct IncomingCall {
32 pub room_id: u64,
33 pub calling_user: Arc<User>,
34 pub participants: Vec<Arc<User>>,
35 pub initial_project: Option<proto::ParticipantProject>,
36}
37
38/// Singleton global maintaining the user's participation in a room across workspaces.
39pub struct ActiveCall {
40 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
41 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
42 location: Option<WeakModelHandle<Project>>,
43 pending_invites: HashSet<u64>,
44 incoming_call: (
45 watch::Sender<Option<IncomingCall>>,
46 watch::Receiver<Option<IncomingCall>>,
47 ),
48 client: Arc<Client>,
49 user_store: ModelHandle<UserStore>,
50 _subscriptions: Vec<client::Subscription>,
51}
52
53impl Entity for ActiveCall {
54 type Event = room::Event;
55}
56
57impl ActiveCall {
58 fn new(
59 client: Arc<Client>,
60 user_store: ModelHandle<UserStore>,
61 cx: &mut ModelContext<Self>,
62 ) -> Self {
63 Self {
64 room: None,
65 pending_room_creation: None,
66 location: None,
67 pending_invites: Default::default(),
68 incoming_call: watch::channel(),
69
70 _subscriptions: vec![
71 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
72 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
73 ],
74 client,
75 user_store,
76 }
77 }
78
79 pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
80 self.room()?.read(cx).channel_id()
81 }
82
83 async fn handle_incoming_call(
84 this: ModelHandle<Self>,
85 envelope: TypedEnvelope<proto::IncomingCall>,
86 _: Arc<Client>,
87 mut cx: AsyncAppContext,
88 ) -> Result<proto::Ack> {
89 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
90 let call = IncomingCall {
91 room_id: envelope.payload.room_id,
92 participants: user_store
93 .update(&mut cx, |user_store, cx| {
94 user_store.get_users(envelope.payload.participant_user_ids, cx)
95 })
96 .await?,
97 calling_user: user_store
98 .update(&mut cx, |user_store, cx| {
99 user_store.get_user(envelope.payload.calling_user_id, cx)
100 })
101 .await?,
102 initial_project: envelope.payload.initial_project,
103 };
104 this.update(&mut cx, |this, _| {
105 *this.incoming_call.0.borrow_mut() = Some(call);
106 });
107
108 Ok(proto::Ack {})
109 }
110
111 async fn handle_call_canceled(
112 this: ModelHandle<Self>,
113 envelope: TypedEnvelope<proto::CallCanceled>,
114 _: Arc<Client>,
115 mut cx: AsyncAppContext,
116 ) -> Result<()> {
117 this.update(&mut cx, |this, _| {
118 let mut incoming_call = this.incoming_call.0.borrow_mut();
119 if incoming_call
120 .as_ref()
121 .map_or(false, |call| call.room_id == envelope.payload.room_id)
122 {
123 incoming_call.take();
124 }
125 });
126 Ok(())
127 }
128
129 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
130 cx.global::<ModelHandle<Self>>().clone()
131 }
132
133 pub fn invite(
134 &mut self,
135 called_user_id: u64,
136 initial_project: Option<ModelHandle<Project>>,
137 cx: &mut ModelContext<Self>,
138 ) -> Task<Result<()>> {
139 if !self.pending_invites.insert(called_user_id) {
140 return Task::ready(Err(anyhow!("user was already invited")));
141 }
142 cx.notify();
143
144 let room = if let Some(room) = self.room().cloned() {
145 Some(Task::ready(Ok(room)).shared())
146 } else {
147 self.pending_room_creation.clone()
148 };
149
150 let invite = if let Some(room) = room {
151 cx.spawn_weak(|_, mut cx| async move {
152 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
153
154 let initial_project_id = if let Some(initial_project) = initial_project {
155 Some(
156 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
157 .await?,
158 )
159 } else {
160 None
161 };
162
163 room.update(&mut cx, |room, cx| {
164 room.call(called_user_id, initial_project_id, cx)
165 })
166 .await?;
167
168 anyhow::Ok(())
169 })
170 } else {
171 let client = self.client.clone();
172 let user_store = self.user_store.clone();
173 let room = cx
174 .spawn(|this, mut cx| async move {
175 let create_room = async {
176 let room = cx
177 .update(|cx| {
178 Room::create(
179 called_user_id,
180 initial_project,
181 client,
182 user_store,
183 cx,
184 )
185 })
186 .await?;
187
188 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
189 .await?;
190
191 anyhow::Ok(room)
192 };
193
194 let room = create_room.await;
195 this.update(&mut cx, |this, _| this.pending_room_creation = None);
196 room.map_err(Arc::new)
197 })
198 .shared();
199 self.pending_room_creation = Some(room.clone());
200 cx.foreground().spawn(async move {
201 room.await.map_err(|err| anyhow!("{:?}", err))?;
202 anyhow::Ok(())
203 })
204 };
205
206 cx.spawn(|this, mut cx| async move {
207 let result = invite.await;
208 if result.is_ok() {
209 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
210 } else {
211 // TODO: Resport collaboration error
212 }
213
214 this.update(&mut cx, |this, cx| {
215 this.pending_invites.remove(&called_user_id);
216 cx.notify();
217 });
218 result
219 })
220 }
221
222 pub fn cancel_invite(
223 &mut self,
224 called_user_id: u64,
225 cx: &mut ModelContext<Self>,
226 ) -> Task<Result<()>> {
227 let room_id = if let Some(room) = self.room() {
228 room.read(cx).id()
229 } else {
230 return Task::ready(Err(anyhow!("no active call")));
231 };
232
233 let client = self.client.clone();
234 cx.foreground().spawn(async move {
235 client
236 .request(proto::CancelCall {
237 room_id,
238 called_user_id,
239 })
240 .await?;
241 anyhow::Ok(())
242 })
243 }
244
245 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
246 self.incoming_call.1.clone()
247 }
248
249 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
250 if self.room.is_some() {
251 return Task::ready(Err(anyhow!("cannot join while on another call")));
252 }
253
254 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
255 call
256 } else {
257 return Task::ready(Err(anyhow!("no incoming call")));
258 };
259
260 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
261
262 cx.spawn(|this, mut cx| async move {
263 let room = join.await?;
264 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
265 .await?;
266 this.update(&mut cx, |this, cx| {
267 this.report_call_event("accept incoming", cx)
268 });
269 Ok(())
270 })
271 }
272
273 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
274 let call = self
275 .incoming_call
276 .0
277 .borrow_mut()
278 .take()
279 .ok_or_else(|| anyhow!("no incoming call"))?;
280 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
281 self.client.send(proto::DeclineCall {
282 room_id: call.room_id,
283 })?;
284 Ok(())
285 }
286
287 pub fn join_channel(
288 &mut self,
289 channel_id: u64,
290 cx: &mut ModelContext<Self>,
291 ) -> Task<Result<()>> {
292 if let Some(room) = self.room().cloned() {
293 if room.read(cx).channel_id() == Some(channel_id) {
294 return Task::ready(Ok(()));
295 } else {
296 room.update(cx, |room, cx| room.clear_state(cx));
297 }
298 }
299
300 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
301
302 cx.spawn(|this, mut cx| async move {
303 let room = join.await?;
304 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
305 .await?;
306 this.update(&mut cx, |this, cx| {
307 this.report_call_event("join channel", cx)
308 });
309 Ok(())
310 })
311 }
312
313 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
314 cx.notify();
315 self.report_call_event("hang up", cx);
316 Audio::end_call(cx);
317 if let Some((room, _)) = self.room.take() {
318 room.update(cx, |room, cx| room.leave(cx))
319 } else {
320 Task::ready(Ok(()))
321 }
322 }
323
324 pub fn share_project(
325 &mut self,
326 project: ModelHandle<Project>,
327 cx: &mut ModelContext<Self>,
328 ) -> Task<Result<u64>> {
329 if let Some((room, _)) = self.room.as_ref() {
330 self.report_call_event("share project", cx);
331 room.update(cx, |room, cx| room.share_project(project, cx))
332 } else {
333 Task::ready(Err(anyhow!("no active call")))
334 }
335 }
336
337 pub fn unshare_project(
338 &mut self,
339 project: ModelHandle<Project>,
340 cx: &mut ModelContext<Self>,
341 ) -> Result<()> {
342 if let Some((room, _)) = self.room.as_ref() {
343 self.report_call_event("unshare project", cx);
344 room.update(cx, |room, cx| room.unshare_project(project, cx))
345 } else {
346 Err(anyhow!("no active call"))
347 }
348 }
349
350 pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
351 self.location.as_ref()
352 }
353
354 pub fn set_location(
355 &mut self,
356 project: Option<&ModelHandle<Project>>,
357 cx: &mut ModelContext<Self>,
358 ) -> Task<Result<()>> {
359 self.location = project.map(|project| project.downgrade());
360 if let Some((room, _)) = self.room.as_ref() {
361 room.update(cx, |room, cx| room.set_location(project, cx))
362 } else {
363 Task::ready(Ok(()))
364 }
365 }
366
367 fn set_room(
368 &mut self,
369 room: Option<ModelHandle<Room>>,
370 cx: &mut ModelContext<Self>,
371 ) -> Task<Result<()>> {
372 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
373 cx.notify();
374 if let Some(room) = room {
375 if room.read(cx).status().is_offline() {
376 self.room = None;
377 Task::ready(Ok(()))
378 } else {
379 let subscriptions = vec![
380 cx.observe(&room, |this, room, cx| {
381 if room.read(cx).status().is_offline() {
382 this.set_room(None, cx).detach_and_log_err(cx);
383 }
384
385 cx.notify();
386 }),
387 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
388 ];
389 self.room = Some((room.clone(), subscriptions));
390 let location = self.location.and_then(|location| location.upgrade(cx));
391 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
392 }
393 } else {
394 self.room = None;
395 Task::ready(Ok(()))
396 }
397 } else {
398 Task::ready(Ok(()))
399 }
400 }
401
402 pub fn room(&self) -> Option<&ModelHandle<Room>> {
403 self.room.as_ref().map(|(room, _)| room)
404 }
405
406 pub fn client(&self) -> Arc<Client> {
407 self.client.clone()
408 }
409
410 pub fn pending_invites(&self) -> &HashSet<u64> {
411 &self.pending_invites
412 }
413
414 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
415 if let Some(room) = self.room() {
416 let room = room.read(cx);
417 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
418 }
419 }
420}
421
422pub fn report_call_event_for_room(
423 operation: &'static str,
424 room_id: u64,
425 channel_id: Option<u64>,
426 client: &Arc<Client>,
427 cx: &AppContext,
428) {
429 let telemetry = client.telemetry();
430 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
431 let event = ClickhouseEvent::Call {
432 operation,
433 room_id: Some(room_id),
434 channel_id,
435 };
436 telemetry.report_clickhouse_event(event, telemetry_settings);
437}
438
439pub fn report_call_event_for_channel(
440 operation: &'static str,
441 channel_id: u64,
442 client: &Arc<Client>,
443 cx: &AppContext,
444) {
445 let room = ActiveCall::global(cx).read(cx).room();
446
447 let telemetry = client.telemetry();
448 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
449
450 let event = ClickhouseEvent::Call {
451 operation,
452 room_id: room.map(|r| r.read(cx).id()),
453 channel_id: Some(channel_id),
454 };
455 telemetry.report_clickhouse_event(event, telemetry_settings);
456}