1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use std::sync::Arc;
6
7use anyhow::{anyhow, Result};
8use call_settings::CallSettings;
9use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
10use collections::HashSet;
11use futures::{future::Shared, FutureExt};
12use postage::watch;
13
14use gpui::{
15 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
16 WeakModelHandle,
17};
18use project::Project;
19
20pub use participant::ParticipantLocation;
21pub use room::Room;
22
23pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
24 settings::register::<CallSettings>(cx);
25
26 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
27 cx.set_global(active_call);
28}
29
30#[derive(Clone)]
31pub struct IncomingCall {
32 pub room_id: u64,
33 pub calling_user: Arc<User>,
34 pub participants: Vec<Arc<User>>,
35 pub initial_project: Option<proto::ParticipantProject>,
36}
37
38/// Singleton global maintaining the user's participation in a room across workspaces.
39pub struct ActiveCall {
40 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
41 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
42 location: Option<WeakModelHandle<Project>>,
43 pending_invites: HashSet<u64>,
44 incoming_call: (
45 watch::Sender<Option<IncomingCall>>,
46 watch::Receiver<Option<IncomingCall>>,
47 ),
48 client: Arc<Client>,
49 user_store: ModelHandle<UserStore>,
50 _subscriptions: Vec<client::Subscription>,
51}
52
53impl Entity for ActiveCall {
54 type Event = room::Event;
55}
56
57impl ActiveCall {
58 fn new(
59 client: Arc<Client>,
60 user_store: ModelHandle<UserStore>,
61 cx: &mut ModelContext<Self>,
62 ) -> Self {
63 Self {
64 room: None,
65 pending_room_creation: None,
66 location: None,
67 pending_invites: Default::default(),
68 incoming_call: watch::channel(),
69 _subscriptions: vec![
70 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
71 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
72 ],
73 client,
74 user_store,
75 }
76 }
77
78 async fn handle_incoming_call(
79 this: ModelHandle<Self>,
80 envelope: TypedEnvelope<proto::IncomingCall>,
81 _: Arc<Client>,
82 mut cx: AsyncAppContext,
83 ) -> Result<proto::Ack> {
84 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
85 let call = IncomingCall {
86 room_id: envelope.payload.room_id,
87 participants: user_store
88 .update(&mut cx, |user_store, cx| {
89 user_store.get_users(envelope.payload.participant_user_ids, cx)
90 })
91 .await?,
92 calling_user: user_store
93 .update(&mut cx, |user_store, cx| {
94 user_store.get_user(envelope.payload.calling_user_id, cx)
95 })
96 .await?,
97 initial_project: envelope.payload.initial_project,
98 };
99 this.update(&mut cx, |this, _| {
100 *this.incoming_call.0.borrow_mut() = Some(call);
101 });
102
103 Ok(proto::Ack {})
104 }
105
106 async fn handle_call_canceled(
107 this: ModelHandle<Self>,
108 envelope: TypedEnvelope<proto::CallCanceled>,
109 _: Arc<Client>,
110 mut cx: AsyncAppContext,
111 ) -> Result<()> {
112 this.update(&mut cx, |this, _| {
113 let mut incoming_call = this.incoming_call.0.borrow_mut();
114 if incoming_call
115 .as_ref()
116 .map_or(false, |call| call.room_id == envelope.payload.room_id)
117 {
118 incoming_call.take();
119 }
120 });
121 Ok(())
122 }
123
124 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
125 cx.global::<ModelHandle<Self>>().clone()
126 }
127
128 pub fn invite(
129 &mut self,
130 called_user_id: u64,
131 initial_project: Option<ModelHandle<Project>>,
132 cx: &mut ModelContext<Self>,
133 ) -> Task<Result<()>> {
134 if !self.pending_invites.insert(called_user_id) {
135 return Task::ready(Err(anyhow!("user was already invited")));
136 }
137 cx.notify();
138
139 let room = if let Some(room) = self.room().cloned() {
140 Some(Task::ready(Ok(room)).shared())
141 } else {
142 self.pending_room_creation.clone()
143 };
144
145 let invite = if let Some(room) = room {
146 cx.spawn_weak(|_, mut cx| async move {
147 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
148
149 let initial_project_id = if let Some(initial_project) = initial_project {
150 Some(
151 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
152 .await?,
153 )
154 } else {
155 None
156 };
157
158 room.update(&mut cx, |room, cx| {
159 room.call(called_user_id, initial_project_id, cx)
160 })
161 .await?;
162
163 anyhow::Ok(())
164 })
165 } else {
166 let client = self.client.clone();
167 let user_store = self.user_store.clone();
168 let room = cx
169 .spawn(|this, mut cx| async move {
170 let create_room = async {
171 let room = cx
172 .update(|cx| {
173 Room::create(
174 called_user_id,
175 initial_project,
176 client,
177 user_store,
178 cx,
179 )
180 })
181 .await?;
182
183 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
184 .await?;
185
186 anyhow::Ok(room)
187 };
188
189 let room = create_room.await;
190 this.update(&mut cx, |this, _| this.pending_room_creation = None);
191 room.map_err(Arc::new)
192 })
193 .shared();
194 self.pending_room_creation = Some(room.clone());
195 cx.foreground().spawn(async move {
196 room.await.map_err(|err| anyhow!("{:?}", err))?;
197 anyhow::Ok(())
198 })
199 };
200
201 cx.spawn(|this, mut cx| async move {
202 let result = invite.await;
203 this.update(&mut cx, |this, cx| {
204 this.pending_invites.remove(&called_user_id);
205 this.report_call_event("invite", cx);
206 cx.notify();
207 });
208 result
209 })
210 }
211
212 pub fn cancel_invite(
213 &mut self,
214 called_user_id: u64,
215 cx: &mut ModelContext<Self>,
216 ) -> Task<Result<()>> {
217 let room_id = if let Some(room) = self.room() {
218 room.read(cx).id()
219 } else {
220 return Task::ready(Err(anyhow!("no active call")));
221 };
222
223 let client = self.client.clone();
224 cx.foreground().spawn(async move {
225 client
226 .request(proto::CancelCall {
227 room_id,
228 called_user_id,
229 })
230 .await?;
231 anyhow::Ok(())
232 })
233 }
234
235 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
236 self.incoming_call.1.clone()
237 }
238
239 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
240 if self.room.is_some() {
241 return Task::ready(Err(anyhow!("cannot join while on another call")));
242 }
243
244 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
245 call
246 } else {
247 return Task::ready(Err(anyhow!("no incoming call")));
248 };
249
250 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
251
252 cx.spawn(|this, mut cx| async move {
253 let room = join.await?;
254 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
255 .await?;
256 this.update(&mut cx, |this, cx| {
257 this.report_call_event("accept incoming", cx)
258 });
259 Ok(())
260 })
261 }
262
263 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
264 let call = self
265 .incoming_call
266 .0
267 .borrow_mut()
268 .take()
269 .ok_or_else(|| anyhow!("no incoming call"))?;
270 Self::report_call_event_for_room("decline incoming", call.room_id, &self.client, cx);
271 self.client.send(proto::DeclineCall {
272 room_id: call.room_id,
273 })?;
274 Ok(())
275 }
276
277 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
278 cx.notify();
279 self.report_call_event("hang up", cx);
280 if let Some((room, _)) = self.room.take() {
281 room.update(cx, |room, cx| room.leave(cx))
282 } else {
283 Task::ready(Ok(()))
284 }
285 }
286
287 pub fn toggle_screen_sharing(&self, cx: &mut AppContext) {
288 if let Some(room) = self.room().cloned() {
289 let toggle_screen_sharing = room.update(cx, |room, cx| {
290 if room.is_screen_sharing() {
291 self.report_call_event("disable screen share", cx);
292 Task::ready(room.unshare_screen(cx))
293 } else {
294 self.report_call_event("enable screen share", cx);
295 room.share_screen(cx)
296 }
297 });
298 toggle_screen_sharing.detach_and_log_err(cx);
299 }
300 }
301
302 pub fn share_project(
303 &mut self,
304 project: ModelHandle<Project>,
305 cx: &mut ModelContext<Self>,
306 ) -> Task<Result<u64>> {
307 if let Some((room, _)) = self.room.as_ref() {
308 self.report_call_event("share project", cx);
309 room.update(cx, |room, cx| room.share_project(project, cx))
310 } else {
311 Task::ready(Err(anyhow!("no active call")))
312 }
313 }
314
315 pub fn unshare_project(
316 &mut self,
317 project: ModelHandle<Project>,
318 cx: &mut ModelContext<Self>,
319 ) -> Result<()> {
320 if let Some((room, _)) = self.room.as_ref() {
321 self.report_call_event("unshare project", cx);
322 room.update(cx, |room, cx| room.unshare_project(project, cx))
323 } else {
324 Err(anyhow!("no active call"))
325 }
326 }
327
328 pub fn set_location(
329 &mut self,
330 project: Option<&ModelHandle<Project>>,
331 cx: &mut ModelContext<Self>,
332 ) -> Task<Result<()>> {
333 self.location = project.map(|project| project.downgrade());
334 if let Some((room, _)) = self.room.as_ref() {
335 room.update(cx, |room, cx| room.set_location(project, cx))
336 } else {
337 Task::ready(Ok(()))
338 }
339 }
340
341 fn set_room(
342 &mut self,
343 room: Option<ModelHandle<Room>>,
344 cx: &mut ModelContext<Self>,
345 ) -> Task<Result<()>> {
346 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
347 cx.notify();
348 if let Some(room) = room {
349 if room.read(cx).status().is_offline() {
350 self.room = None;
351 Task::ready(Ok(()))
352 } else {
353 let subscriptions = vec![
354 cx.observe(&room, |this, room, cx| {
355 if room.read(cx).status().is_offline() {
356 this.set_room(None, cx).detach_and_log_err(cx);
357 }
358
359 cx.notify();
360 }),
361 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
362 ];
363 self.room = Some((room.clone(), subscriptions));
364 let location = self.location.and_then(|location| location.upgrade(cx));
365 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
366 }
367 } else {
368 self.room = None;
369 Task::ready(Ok(()))
370 }
371 } else {
372 Task::ready(Ok(()))
373 }
374 }
375
376 pub fn room(&self) -> Option<&ModelHandle<Room>> {
377 self.room.as_ref().map(|(room, _)| room)
378 }
379
380 pub fn client(&self) -> Arc<Client> {
381 self.client.clone()
382 }
383
384 pub fn pending_invites(&self) -> &HashSet<u64> {
385 &self.pending_invites
386 }
387
388 fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
389 if let Some(room) = self.room() {
390 Self::report_call_event_for_room(operation, room.read(cx).id(), &self.client, cx)
391 }
392 }
393
394 pub fn report_call_event_for_room(
395 operation: &'static str,
396 room_id: u64,
397 client: &Arc<Client>,
398 cx: &AppContext,
399 ) {
400 let telemetry = client.telemetry();
401 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
402 let event = ClickhouseEvent::Call { operation, room_id };
403 telemetry.report_clickhouse_event(event, telemetry_settings);
404 }
405}