1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use std::sync::Arc;
6
7use anyhow::{anyhow, Result};
8use call_settings::CallSettings;
9use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
10use collections::HashSet;
11use futures::{future::Shared, FutureExt};
12use postage::watch;
13
14use gpui::{
15 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
16 WeakModelHandle,
17};
18use project::Project;
19
20pub use participant::ParticipantLocation;
21pub use room::Room;
22
23pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
24 settings::register::<CallSettings>(cx);
25
26 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
27 cx.set_global(active_call);
28}
29
30#[derive(Clone)]
31pub struct IncomingCall {
32 pub room_id: u64,
33 pub calling_user: Arc<User>,
34 pub participants: Vec<Arc<User>>,
35 pub initial_project: Option<proto::ParticipantProject>,
36}
37
38/// Singleton global maintaining the user's participation in a room across workspaces.
39pub struct ActiveCall {
40 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
41 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
42 location: Option<WeakModelHandle<Project>>,
43 pending_invites: HashSet<u64>,
44 incoming_call: (
45 watch::Sender<Option<IncomingCall>>,
46 watch::Receiver<Option<IncomingCall>>,
47 ),
48 client: Arc<Client>,
49 user_store: ModelHandle<UserStore>,
50 _subscriptions: Vec<client::Subscription>,
51}
52
53impl Entity for ActiveCall {
54 type Event = room::Event;
55}
56
57impl ActiveCall {
58 fn new(
59 client: Arc<Client>,
60 user_store: ModelHandle<UserStore>,
61 cx: &mut ModelContext<Self>,
62 ) -> Self {
63 Self {
64 room: None,
65 pending_room_creation: None,
66 location: None,
67 pending_invites: Default::default(),
68 incoming_call: watch::channel(),
69 _subscriptions: vec![
70 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
71 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
72 ],
73 client,
74 user_store,
75 }
76 }
77
78 async fn handle_incoming_call(
79 this: ModelHandle<Self>,
80 envelope: TypedEnvelope<proto::IncomingCall>,
81 _: Arc<Client>,
82 mut cx: AsyncAppContext,
83 ) -> Result<proto::Ack> {
84 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
85 let call = IncomingCall {
86 room_id: envelope.payload.room_id,
87 participants: user_store
88 .update(&mut cx, |user_store, cx| {
89 user_store.get_users(envelope.payload.participant_user_ids, cx)
90 })
91 .await?,
92 calling_user: user_store
93 .update(&mut cx, |user_store, cx| {
94 user_store.get_user(envelope.payload.calling_user_id, cx)
95 })
96 .await?,
97 initial_project: envelope.payload.initial_project,
98 };
99 this.update(&mut cx, |this, _| {
100 *this.incoming_call.0.borrow_mut() = Some(call);
101 });
102
103 Ok(proto::Ack {})
104 }
105
106 async fn handle_call_canceled(
107 this: ModelHandle<Self>,
108 envelope: TypedEnvelope<proto::CallCanceled>,
109 _: Arc<Client>,
110 mut cx: AsyncAppContext,
111 ) -> Result<()> {
112 this.update(&mut cx, |this, _| {
113 let mut incoming_call = this.incoming_call.0.borrow_mut();
114 if incoming_call
115 .as_ref()
116 .map_or(false, |call| call.room_id == envelope.payload.room_id)
117 {
118 incoming_call.take();
119 }
120 });
121 Ok(())
122 }
123
124 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
125 cx.global::<ModelHandle<Self>>().clone()
126 }
127
128 pub fn invite(
129 &mut self,
130 called_user_id: u64,
131 initial_project: Option<ModelHandle<Project>>,
132 cx: &mut ModelContext<Self>,
133 ) -> Task<Result<()>> {
134 if !self.pending_invites.insert(called_user_id) {
135 return Task::ready(Err(anyhow!("user was already invited")));
136 }
137 cx.notify();
138
139 let room = if let Some(room) = self.room().cloned() {
140 Some(Task::ready(Ok(room)).shared())
141 } else {
142 self.pending_room_creation.clone()
143 };
144
145 let invite = if let Some(room) = room {
146 cx.spawn_weak(|_, mut cx| async move {
147 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
148
149 let initial_project_id = if let Some(initial_project) = initial_project {
150 Some(
151 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
152 .await?,
153 )
154 } else {
155 None
156 };
157
158 room.update(&mut cx, |room, cx| {
159 room.call(called_user_id, initial_project_id, cx)
160 })
161 .await?;
162
163 anyhow::Ok(())
164 })
165 } else {
166 let client = self.client.clone();
167 let user_store = self.user_store.clone();
168 let room = cx
169 .spawn(|this, mut cx| async move {
170 let create_room = async {
171 let room = cx
172 .update(|cx| {
173 Room::create(
174 called_user_id,
175 initial_project,
176 client,
177 user_store,
178 cx,
179 )
180 })
181 .await?;
182
183 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
184 .await?;
185
186 anyhow::Ok(room)
187 };
188
189 let room = create_room.await;
190 this.update(&mut cx, |this, _| this.pending_room_creation = None);
191 room.map_err(Arc::new)
192 })
193 .shared();
194 self.pending_room_creation = Some(room.clone());
195 cx.foreground().spawn(async move {
196 room.await.map_err(|err| anyhow!("{:?}", err))?;
197 anyhow::Ok(())
198 })
199 };
200
201 cx.spawn(|this, mut cx| async move {
202 let result = invite.await;
203 this.update(&mut cx, |this, cx| {
204 this.pending_invites.remove(&called_user_id);
205 this.report_call_event("invite", cx);
206 cx.notify();
207 });
208 result
209 })
210 }
211
212 pub fn cancel_invite(
213 &mut self,
214 called_user_id: u64,
215 cx: &mut ModelContext<Self>,
216 ) -> Task<Result<()>> {
217 let room_id = if let Some(room) = self.room() {
218 room.read(cx).id()
219 } else {
220 return Task::ready(Err(anyhow!("no active call")));
221 };
222
223 let client = self.client.clone();
224 cx.foreground().spawn(async move {
225 client
226 .request(proto::CancelCall {
227 room_id,
228 called_user_id,
229 })
230 .await?;
231 anyhow::Ok(())
232 })
233 }
234
235 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
236 self.incoming_call.1.clone()
237 }
238
239 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
240 if self.room.is_some() {
241 return Task::ready(Err(anyhow!("cannot join while on another call")));
242 }
243
244 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
245 call
246 } else {
247 return Task::ready(Err(anyhow!("no incoming call")));
248 };
249
250 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
251
252 cx.spawn(|this, mut cx| async move {
253 let room = join.await?;
254 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
255 .await?;
256 this.update(&mut cx, |this, cx| {
257 this.report_call_event("accept incoming", cx)
258 });
259 Ok(())
260 })
261 }
262
263 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
264 let call = self
265 .incoming_call
266 .0
267 .borrow_mut()
268 .take()
269 .ok_or_else(|| anyhow!("no incoming call"))?;
270 Self::report_call_event_for_room("decline incoming", call.room_id, &self.client, cx);
271 self.client.send(proto::DeclineCall {
272 room_id: call.room_id,
273 })?;
274 Ok(())
275 }
276
277 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
278 cx.notify();
279 self.report_call_event("hang up", cx);
280 if let Some((room, _)) = self.room.take() {
281 room.update(cx, |room, cx| room.leave(cx))
282 } else {
283 Task::ready(Ok(()))
284 }
285 }
286
287 pub fn share_project(
288 &mut self,
289 project: ModelHandle<Project>,
290 cx: &mut ModelContext<Self>,
291 ) -> Task<Result<u64>> {
292 if let Some((room, _)) = self.room.as_ref() {
293 self.report_call_event("share project", cx);
294 room.update(cx, |room, cx| room.share_project(project, cx))
295 } else {
296 Task::ready(Err(anyhow!("no active call")))
297 }
298 }
299
300 pub fn unshare_project(
301 &mut self,
302 project: ModelHandle<Project>,
303 cx: &mut ModelContext<Self>,
304 ) -> Result<()> {
305 if let Some((room, _)) = self.room.as_ref() {
306 self.report_call_event("unshare project", cx);
307 room.update(cx, |room, cx| room.unshare_project(project, cx))
308 } else {
309 Err(anyhow!("no active call"))
310 }
311 }
312
313 pub fn set_location(
314 &mut self,
315 project: Option<&ModelHandle<Project>>,
316 cx: &mut ModelContext<Self>,
317 ) -> Task<Result<()>> {
318 self.location = project.map(|project| project.downgrade());
319 if let Some((room, _)) = self.room.as_ref() {
320 room.update(cx, |room, cx| room.set_location(project, cx))
321 } else {
322 Task::ready(Ok(()))
323 }
324 }
325
326 fn set_room(
327 &mut self,
328 room: Option<ModelHandle<Room>>,
329 cx: &mut ModelContext<Self>,
330 ) -> Task<Result<()>> {
331 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
332 cx.notify();
333 if let Some(room) = room {
334 if room.read(cx).status().is_offline() {
335 self.room = None;
336 Task::ready(Ok(()))
337 } else {
338 let subscriptions = vec![
339 cx.observe(&room, |this, room, cx| {
340 if room.read(cx).status().is_offline() {
341 this.set_room(None, cx).detach_and_log_err(cx);
342 }
343
344 cx.notify();
345 }),
346 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
347 ];
348 self.room = Some((room.clone(), subscriptions));
349 let location = self.location.and_then(|location| location.upgrade(cx));
350 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
351 }
352 } else {
353 self.room = None;
354 Task::ready(Ok(()))
355 }
356 } else {
357 Task::ready(Ok(()))
358 }
359 }
360
361 pub fn room(&self) -> Option<&ModelHandle<Room>> {
362 self.room.as_ref().map(|(room, _)| room)
363 }
364
365 pub fn client(&self) -> Arc<Client> {
366 self.client.clone()
367 }
368
369 pub fn pending_invites(&self) -> &HashSet<u64> {
370 &self.pending_invites
371 }
372
373 fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
374 if let Some(room) = self.room() {
375 Self::report_call_event_for_room(operation, room.read(cx).id(), &self.client, cx)
376 }
377 }
378
379 pub fn report_call_event_for_room(
380 operation: &'static str,
381 room_id: u64,
382 client: &Arc<Client>,
383 cx: &AppContext,
384 ) {
385 let telemetry = client.telemetry();
386 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
387 let event = ClickhouseEvent::Call { operation, room_id };
388 telemetry.report_clickhouse_event(event, telemetry_settings);
389 }
390}