1pub mod participant;
2pub mod room;
3
4use std::sync::Arc;
5
6use anyhow::{anyhow, Result};
7use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
8use collections::HashSet;
9use futures::{future::Shared, FutureExt};
10use postage::watch;
11
12use gpui::{
13 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
14 WeakModelHandle,
15};
16use project::Project;
17
18pub use participant::ParticipantLocation;
19pub use room::Room;
20
21pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
22 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
23 cx.set_global(active_call);
24}
25
26#[derive(Clone)]
27pub struct IncomingCall {
28 pub room_id: u64,
29 pub calling_user: Arc<User>,
30 pub participants: Vec<Arc<User>>,
31 pub initial_project: Option<proto::ParticipantProject>,
32}
33
34/// Singleton global maintaining the user's participation in a room across workspaces.
35pub struct ActiveCall {
36 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
37 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
38 location: Option<WeakModelHandle<Project>>,
39 pending_invites: HashSet<u64>,
40 incoming_call: (
41 watch::Sender<Option<IncomingCall>>,
42 watch::Receiver<Option<IncomingCall>>,
43 ),
44 client: Arc<Client>,
45 user_store: ModelHandle<UserStore>,
46 _subscriptions: Vec<client::Subscription>,
47}
48
49impl Entity for ActiveCall {
50 type Event = room::Event;
51}
52
53impl ActiveCall {
54 fn new(
55 client: Arc<Client>,
56 user_store: ModelHandle<UserStore>,
57 cx: &mut ModelContext<Self>,
58 ) -> Self {
59 Self {
60 room: None,
61 pending_room_creation: None,
62 location: None,
63 pending_invites: Default::default(),
64 incoming_call: watch::channel(),
65 _subscriptions: vec![
66 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
67 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
68 ],
69 client,
70 user_store,
71 }
72 }
73
74 async fn handle_incoming_call(
75 this: ModelHandle<Self>,
76 envelope: TypedEnvelope<proto::IncomingCall>,
77 _: Arc<Client>,
78 mut cx: AsyncAppContext,
79 ) -> Result<proto::Ack> {
80 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
81 let call = IncomingCall {
82 room_id: envelope.payload.room_id,
83 participants: user_store
84 .update(&mut cx, |user_store, cx| {
85 user_store.get_users(envelope.payload.participant_user_ids, cx)
86 })
87 .await?,
88 calling_user: user_store
89 .update(&mut cx, |user_store, cx| {
90 user_store.get_user(envelope.payload.calling_user_id, cx)
91 })
92 .await?,
93 initial_project: envelope.payload.initial_project,
94 };
95 this.update(&mut cx, |this, _| {
96 *this.incoming_call.0.borrow_mut() = Some(call);
97 });
98
99 Ok(proto::Ack {})
100 }
101
102 async fn handle_call_canceled(
103 this: ModelHandle<Self>,
104 envelope: TypedEnvelope<proto::CallCanceled>,
105 _: Arc<Client>,
106 mut cx: AsyncAppContext,
107 ) -> Result<()> {
108 this.update(&mut cx, |this, _| {
109 let mut incoming_call = this.incoming_call.0.borrow_mut();
110 if incoming_call
111 .as_ref()
112 .map_or(false, |call| call.room_id == envelope.payload.room_id)
113 {
114 incoming_call.take();
115 }
116 });
117 Ok(())
118 }
119
120 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
121 cx.global::<ModelHandle<Self>>().clone()
122 }
123
124 pub fn invite(
125 &mut self,
126 called_user_id: u64,
127 initial_project: Option<ModelHandle<Project>>,
128 cx: &mut ModelContext<Self>,
129 ) -> Task<Result<()>> {
130 if !self.pending_invites.insert(called_user_id) {
131 return Task::ready(Err(anyhow!("user was already invited")));
132 }
133 cx.notify();
134
135 let room = if let Some(room) = self.room().cloned() {
136 Some(Task::ready(Ok(room)).shared())
137 } else {
138 self.pending_room_creation.clone()
139 };
140
141 let invite = if let Some(room) = room {
142 cx.spawn_weak(|_, mut cx| async move {
143 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
144
145 let initial_project_id = if let Some(initial_project) = initial_project {
146 Some(
147 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
148 .await?,
149 )
150 } else {
151 None
152 };
153
154 room.update(&mut cx, |room, cx| {
155 room.call(called_user_id, initial_project_id, cx)
156 })
157 .await?;
158
159 anyhow::Ok(())
160 })
161 } else {
162 let client = self.client.clone();
163 let user_store = self.user_store.clone();
164 let room = cx
165 .spawn(|this, mut cx| async move {
166 let create_room = async {
167 let room = cx
168 .update(|cx| {
169 Room::create(
170 called_user_id,
171 initial_project,
172 client,
173 user_store,
174 cx,
175 )
176 })
177 .await?;
178
179 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
180 .await?;
181
182 anyhow::Ok(room)
183 };
184
185 let room = create_room.await;
186 this.update(&mut cx, |this, _| this.pending_room_creation = None);
187 room.map_err(Arc::new)
188 })
189 .shared();
190 self.pending_room_creation = Some(room.clone());
191 cx.foreground().spawn(async move {
192 room.await.map_err(|err| anyhow!("{:?}", err))?;
193 anyhow::Ok(())
194 })
195 };
196
197 cx.spawn(|this, mut cx| async move {
198 let result = invite.await;
199 this.update(&mut cx, |this, cx| {
200 this.pending_invites.remove(&called_user_id);
201 this.report_call_event("invite", cx);
202 cx.notify();
203 });
204 result
205 })
206 }
207
208 pub fn cancel_invite(
209 &mut self,
210 called_user_id: u64,
211 cx: &mut ModelContext<Self>,
212 ) -> Task<Result<()>> {
213 let room_id = if let Some(room) = self.room() {
214 room.read(cx).id()
215 } else {
216 return Task::ready(Err(anyhow!("no active call")));
217 };
218
219 let client = self.client.clone();
220 cx.foreground().spawn(async move {
221 client
222 .request(proto::CancelCall {
223 room_id,
224 called_user_id,
225 })
226 .await?;
227 anyhow::Ok(())
228 })
229 }
230
231 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
232 self.incoming_call.1.clone()
233 }
234
235 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
236 if self.room.is_some() {
237 return Task::ready(Err(anyhow!("cannot join while on another call")));
238 }
239
240 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
241 call
242 } else {
243 return Task::ready(Err(anyhow!("no incoming call")));
244 };
245
246 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
247
248 cx.spawn(|this, mut cx| async move {
249 let room = join.await?;
250 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
251 .await?;
252 this.update(&mut cx, |this, cx| {
253 this.report_call_event("accept incoming", cx)
254 });
255 Ok(())
256 })
257 }
258
259 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
260 let call = self
261 .incoming_call
262 .0
263 .borrow_mut()
264 .take()
265 .ok_or_else(|| anyhow!("no incoming call"))?;
266 self.report_call_event_for_room("decline incoming", call.room_id, cx);
267 self.client.send(proto::DeclineCall {
268 room_id: call.room_id,
269 })?;
270 Ok(())
271 }
272
273 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
274 cx.notify();
275 self.report_call_event("hang up", cx);
276 if let Some((room, _)) = self.room.take() {
277 room.update(cx, |room, cx| room.leave(cx))
278 } else {
279 Task::ready(Ok(()))
280 }
281 }
282
283 pub fn toggle_screen_sharing(&self, cx: &mut AppContext) {
284 if let Some(room) = self.room().cloned() {
285 let toggle_screen_sharing = room.update(cx, |room, cx| {
286 if room.is_screen_sharing() {
287 self.report_call_event("disable screen share", cx);
288 Task::ready(room.unshare_screen(cx))
289 } else {
290 self.report_call_event("enable screen share", cx);
291 room.share_screen(cx)
292 }
293 });
294 toggle_screen_sharing.detach_and_log_err(cx);
295 }
296 }
297
298 pub fn share_project(
299 &mut self,
300 project: ModelHandle<Project>,
301 cx: &mut ModelContext<Self>,
302 ) -> Task<Result<u64>> {
303 if let Some((room, _)) = self.room.as_ref() {
304 self.report_call_event("share project", cx);
305 room.update(cx, |room, cx| room.share_project(project, cx))
306 } else {
307 Task::ready(Err(anyhow!("no active call")))
308 }
309 }
310
311 pub fn unshare_project(
312 &mut self,
313 project: ModelHandle<Project>,
314 cx: &mut ModelContext<Self>,
315 ) -> Result<()> {
316 if let Some((room, _)) = self.room.as_ref() {
317 self.report_call_event("unshare project", cx);
318 room.update(cx, |room, cx| room.unshare_project(project, cx))
319 } else {
320 Err(anyhow!("no active call"))
321 }
322 }
323
324 pub fn set_location(
325 &mut self,
326 project: Option<&ModelHandle<Project>>,
327 cx: &mut ModelContext<Self>,
328 ) -> Task<Result<()>> {
329 self.location = project.map(|project| project.downgrade());
330 if let Some((room, _)) = self.room.as_ref() {
331 room.update(cx, |room, cx| room.set_location(project, cx))
332 } else {
333 Task::ready(Ok(()))
334 }
335 }
336
337 fn set_room(
338 &mut self,
339 room: Option<ModelHandle<Room>>,
340 cx: &mut ModelContext<Self>,
341 ) -> Task<Result<()>> {
342 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
343 cx.notify();
344 if let Some(room) = room {
345 if room.read(cx).status().is_offline() {
346 self.room = None;
347 Task::ready(Ok(()))
348 } else {
349 let subscriptions = vec![
350 cx.observe(&room, |this, room, cx| {
351 if room.read(cx).status().is_offline() {
352 this.set_room(None, cx).detach_and_log_err(cx);
353 }
354
355 cx.notify();
356 }),
357 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
358 ];
359 self.room = Some((room.clone(), subscriptions));
360 let location = self.location.and_then(|location| location.upgrade(cx));
361 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
362 }
363 } else {
364 self.room = None;
365 Task::ready(Ok(()))
366 }
367 } else {
368 Task::ready(Ok(()))
369 }
370 }
371
372 pub fn room(&self) -> Option<&ModelHandle<Room>> {
373 self.room.as_ref().map(|(room, _)| room)
374 }
375
376 pub fn pending_invites(&self) -> &HashSet<u64> {
377 &self.pending_invites
378 }
379
380 fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
381 if let Some(room) = self.room() {
382 self.report_call_event_for_room(operation, room.read(cx).id(), cx)
383 }
384 }
385
386 fn report_call_event_for_room(&self, operation: &'static str, room_id: u64, cx: &AppContext) {
387 let telemetry = self.client.telemetry();
388 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
389
390 let event = ClickhouseEvent::Call { operation, room_id };
391
392 telemetry.report_clickhouse_event(event, telemetry_settings);
393 }
394}