1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use std::sync::Arc;
6
7use anyhow::{anyhow, Result};
8use audio::Audio;
9use call_settings::CallSettings;
10use channel::ChannelId;
11use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
12use collections::HashSet;
13use futures::{future::Shared, FutureExt};
14use postage::watch;
15
16use gpui::{
17 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
18 WeakModelHandle,
19};
20use project::Project;
21
22pub use participant::ParticipantLocation;
23pub use room::Room;
24
25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
26 settings::register::<CallSettings>(cx);
27
28 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
29 cx.set_global(active_call);
30}
31
32#[derive(Clone)]
33pub struct IncomingCall {
34 pub room_id: u64,
35 pub calling_user: Arc<User>,
36 pub participants: Vec<Arc<User>>,
37 pub initial_project: Option<proto::ParticipantProject>,
38}
39
40/// Singleton global maintaining the user's participation in a room across workspaces.
41pub struct ActiveCall {
42 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
43 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
44 location: Option<WeakModelHandle<Project>>,
45 pending_invites: HashSet<u64>,
46 incoming_call: (
47 watch::Sender<Option<IncomingCall>>,
48 watch::Receiver<Option<IncomingCall>>,
49 ),
50 client: Arc<Client>,
51 user_store: ModelHandle<UserStore>,
52 _subscriptions: Vec<client::Subscription>,
53}
54
55impl Entity for ActiveCall {
56 type Event = room::Event;
57}
58
59impl ActiveCall {
60 fn new(
61 client: Arc<Client>,
62 user_store: ModelHandle<UserStore>,
63 cx: &mut ModelContext<Self>,
64 ) -> Self {
65 Self {
66 room: None,
67 pending_room_creation: None,
68 location: None,
69 pending_invites: Default::default(),
70 incoming_call: watch::channel(),
71 _subscriptions: vec![
72 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
73 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
74 ],
75 client,
76 user_store,
77 }
78 }
79
80 pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
81 self.room()?.read(cx).channel_id()
82 }
83
84 async fn handle_incoming_call(
85 this: ModelHandle<Self>,
86 envelope: TypedEnvelope<proto::IncomingCall>,
87 _: Arc<Client>,
88 mut cx: AsyncAppContext,
89 ) -> Result<proto::Ack> {
90 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
91 let call = IncomingCall {
92 room_id: envelope.payload.room_id,
93 participants: user_store
94 .update(&mut cx, |user_store, cx| {
95 user_store.get_users(envelope.payload.participant_user_ids, cx)
96 })
97 .await?,
98 calling_user: user_store
99 .update(&mut cx, |user_store, cx| {
100 user_store.get_user(envelope.payload.calling_user_id, cx)
101 })
102 .await?,
103 initial_project: envelope.payload.initial_project,
104 };
105 this.update(&mut cx, |this, _| {
106 *this.incoming_call.0.borrow_mut() = Some(call);
107 });
108
109 Ok(proto::Ack {})
110 }
111
112 async fn handle_call_canceled(
113 this: ModelHandle<Self>,
114 envelope: TypedEnvelope<proto::CallCanceled>,
115 _: Arc<Client>,
116 mut cx: AsyncAppContext,
117 ) -> Result<()> {
118 this.update(&mut cx, |this, _| {
119 let mut incoming_call = this.incoming_call.0.borrow_mut();
120 if incoming_call
121 .as_ref()
122 .map_or(false, |call| call.room_id == envelope.payload.room_id)
123 {
124 incoming_call.take();
125 }
126 });
127 Ok(())
128 }
129
130 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
131 cx.global::<ModelHandle<Self>>().clone()
132 }
133
134 pub fn invite(
135 &mut self,
136 called_user_id: u64,
137 initial_project: Option<ModelHandle<Project>>,
138 cx: &mut ModelContext<Self>,
139 ) -> Task<Result<()>> {
140 if !self.pending_invites.insert(called_user_id) {
141 return Task::ready(Err(anyhow!("user was already invited")));
142 }
143 cx.notify();
144
145 let room = if let Some(room) = self.room().cloned() {
146 Some(Task::ready(Ok(room)).shared())
147 } else {
148 self.pending_room_creation.clone()
149 };
150
151 let invite = if let Some(room) = room {
152 cx.spawn_weak(|_, mut cx| async move {
153 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
154
155 let initial_project_id = if let Some(initial_project) = initial_project {
156 Some(
157 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
158 .await?,
159 )
160 } else {
161 None
162 };
163
164 room.update(&mut cx, |room, cx| {
165 room.call(called_user_id, initial_project_id, cx)
166 })
167 .await?;
168
169 anyhow::Ok(())
170 })
171 } else {
172 let client = self.client.clone();
173 let user_store = self.user_store.clone();
174 let room = cx
175 .spawn(|this, mut cx| async move {
176 let create_room = async {
177 let room = cx
178 .update(|cx| {
179 Room::create(
180 called_user_id,
181 initial_project,
182 client,
183 user_store,
184 cx,
185 )
186 })
187 .await?;
188
189 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
190 .await?;
191
192 anyhow::Ok(room)
193 };
194
195 let room = create_room.await;
196 this.update(&mut cx, |this, _| this.pending_room_creation = None);
197 room.map_err(Arc::new)
198 })
199 .shared();
200 self.pending_room_creation = Some(room.clone());
201 cx.foreground().spawn(async move {
202 room.await.map_err(|err| anyhow!("{:?}", err))?;
203 anyhow::Ok(())
204 })
205 };
206
207 cx.spawn(|this, mut cx| async move {
208 let result = invite.await;
209 if result.is_ok() {
210 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
211 } else {
212 // TODO: Resport collaboration error
213 }
214
215 this.update(&mut cx, |this, cx| {
216 this.pending_invites.remove(&called_user_id);
217 cx.notify();
218 });
219 result
220 })
221 }
222
223 pub fn cancel_invite(
224 &mut self,
225 called_user_id: u64,
226 cx: &mut ModelContext<Self>,
227 ) -> Task<Result<()>> {
228 let room_id = if let Some(room) = self.room() {
229 room.read(cx).id()
230 } else {
231 return Task::ready(Err(anyhow!("no active call")));
232 };
233
234 let client = self.client.clone();
235 cx.foreground().spawn(async move {
236 client
237 .request(proto::CancelCall {
238 room_id,
239 called_user_id,
240 })
241 .await?;
242 anyhow::Ok(())
243 })
244 }
245
246 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
247 self.incoming_call.1.clone()
248 }
249
250 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
251 if self.room.is_some() {
252 return Task::ready(Err(anyhow!("cannot join while on another call")));
253 }
254
255 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
256 call
257 } else {
258 return Task::ready(Err(anyhow!("no incoming call")));
259 };
260
261 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
262
263 cx.spawn(|this, mut cx| async move {
264 let room = join.await?;
265 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
266 .await?;
267 this.update(&mut cx, |this, cx| {
268 this.report_call_event("accept incoming", cx)
269 });
270 Ok(())
271 })
272 }
273
274 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
275 let call = self
276 .incoming_call
277 .0
278 .borrow_mut()
279 .take()
280 .ok_or_else(|| anyhow!("no incoming call"))?;
281 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
282 self.client.send(proto::DeclineCall {
283 room_id: call.room_id,
284 })?;
285 Ok(())
286 }
287
288 pub fn join_channel(
289 &mut self,
290 channel_id: u64,
291 cx: &mut ModelContext<Self>,
292 ) -> Task<Result<()>> {
293 if let Some(room) = self.room().cloned() {
294 if room.read(cx).channel_id() == Some(channel_id) {
295 return Task::ready(Ok(()));
296 } else {
297 room.update(cx, |room, cx| room.clear_state(cx));
298 }
299 }
300
301 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
302
303 cx.spawn(|this, mut cx| async move {
304 let room = join.await?;
305 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
306 .await?;
307 this.update(&mut cx, |this, cx| {
308 this.report_call_event("join channel", cx)
309 });
310 Ok(())
311 })
312 }
313
314 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
315 cx.notify();
316 self.report_call_event("hang up", cx);
317 Audio::end_call(cx);
318 if let Some((room, _)) = self.room.take() {
319 room.update(cx, |room, cx| room.leave(cx))
320 } else {
321 Task::ready(Ok(()))
322 }
323 }
324
325 pub fn share_project(
326 &mut self,
327 project: ModelHandle<Project>,
328 cx: &mut ModelContext<Self>,
329 ) -> Task<Result<u64>> {
330 if let Some((room, _)) = self.room.as_ref() {
331 self.report_call_event("share project", cx);
332 room.update(cx, |room, cx| room.share_project(project, cx))
333 } else {
334 Task::ready(Err(anyhow!("no active call")))
335 }
336 }
337
338 pub fn unshare_project(
339 &mut self,
340 project: ModelHandle<Project>,
341 cx: &mut ModelContext<Self>,
342 ) -> Result<()> {
343 if let Some((room, _)) = self.room.as_ref() {
344 self.report_call_event("unshare project", cx);
345 room.update(cx, |room, cx| room.unshare_project(project, cx))
346 } else {
347 Err(anyhow!("no active call"))
348 }
349 }
350
351 pub fn set_location(
352 &mut self,
353 project: Option<&ModelHandle<Project>>,
354 cx: &mut ModelContext<Self>,
355 ) -> Task<Result<()>> {
356 self.location = project.map(|project| project.downgrade());
357 if let Some((room, _)) = self.room.as_ref() {
358 room.update(cx, |room, cx| room.set_location(project, cx))
359 } else {
360 Task::ready(Ok(()))
361 }
362 }
363
364 fn set_room(
365 &mut self,
366 room: Option<ModelHandle<Room>>,
367 cx: &mut ModelContext<Self>,
368 ) -> Task<Result<()>> {
369 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
370 cx.notify();
371 if let Some(room) = room {
372 if room.read(cx).status().is_offline() {
373 self.room = None;
374 Task::ready(Ok(()))
375 } else {
376 let subscriptions = vec![
377 cx.observe(&room, |this, room, cx| {
378 if room.read(cx).status().is_offline() {
379 this.set_room(None, cx).detach_and_log_err(cx);
380 }
381
382 cx.notify();
383 }),
384 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
385 ];
386 self.room = Some((room.clone(), subscriptions));
387 let location = self.location.and_then(|location| location.upgrade(cx));
388 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
389 }
390 } else {
391 self.room = None;
392 Task::ready(Ok(()))
393 }
394 } else {
395 Task::ready(Ok(()))
396 }
397 }
398
399 pub fn room(&self) -> Option<&ModelHandle<Room>> {
400 self.room.as_ref().map(|(room, _)| room)
401 }
402
403 pub fn client(&self) -> Arc<Client> {
404 self.client.clone()
405 }
406
407 pub fn pending_invites(&self) -> &HashSet<u64> {
408 &self.pending_invites
409 }
410
411 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
412 if let Some(room) = self.room() {
413 let room = room.read(cx);
414 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
415 }
416 }
417}
418
419pub fn report_call_event_for_room(
420 operation: &'static str,
421 room_id: u64,
422 channel_id: Option<u64>,
423 client: &Arc<Client>,
424 cx: &AppContext,
425) {
426 let telemetry = client.telemetry();
427 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
428 let event = ClickhouseEvent::Call {
429 operation,
430 room_id: Some(room_id),
431 channel_id,
432 };
433 telemetry.report_clickhouse_event(event, telemetry_settings);
434}
435
436pub fn report_call_event_for_channel(
437 operation: &'static str,
438 channel_id: u64,
439 client: &Arc<Client>,
440 cx: &AppContext,
441) {
442 let room = ActiveCall::global(cx).read(cx).room();
443
444 let telemetry = client.telemetry();
445 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
446
447 let event = ClickhouseEvent::Call {
448 operation,
449 room_id: room.map(|r| r.read(cx).id()),
450 channel_id: Some(channel_id),
451 };
452 telemetry.report_clickhouse_event(event, telemetry_settings);
453}