1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio2::Audio;
7use call_settings::CallSettings;
8use client2::{
9 proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
10 ZED_ALWAYS_ACTIVE,
11};
12use collections::HashSet;
13use futures::{future::Shared, FutureExt};
14use gpui2::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Subscription, Task,
16 WeakModel,
17};
18use postage::watch;
19use project2::Project;
20use settings2::Settings;
21use std::sync::Arc;
22
23pub use participant::ParticipantLocation;
24pub use room::Room;
25
26pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
27 CallSettings::register(cx);
28
29 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
30 cx.set_global(active_call);
31}
32
33#[derive(Clone)]
34pub struct IncomingCall {
35 pub room_id: u64,
36 pub calling_user: Arc<User>,
37 pub participants: Vec<Arc<User>>,
38 pub initial_project: Option<proto::ParticipantProject>,
39}
40
41/// Singleton global maintaining the user's participation in a room across workspaces.
42pub struct ActiveCall {
43 room: Option<(Model<Room>, Vec<Subscription>)>,
44 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
45 location: Option<WeakModel<Project>>,
46 pending_invites: HashSet<u64>,
47 incoming_call: (
48 watch::Sender<Option<IncomingCall>>,
49 watch::Receiver<Option<IncomingCall>>,
50 ),
51 client: Arc<Client>,
52 user_store: Model<UserStore>,
53 _subscriptions: Vec<client2::Subscription>,
54}
55
56impl EventEmitter for ActiveCall {
57 type Event = room::Event;
58}
59
60impl ActiveCall {
61 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
62 Self {
63 room: None,
64 pending_room_creation: None,
65 location: None,
66 pending_invites: Default::default(),
67 incoming_call: watch::channel(),
68
69 _subscriptions: vec![
70 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
71 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
72 ],
73 client,
74 user_store,
75 }
76 }
77
78 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
79 self.room()?.read(cx).channel_id()
80 }
81
82 async fn handle_incoming_call(
83 this: Model<Self>,
84 envelope: TypedEnvelope<proto::IncomingCall>,
85 _: Arc<Client>,
86 mut cx: AsyncAppContext,
87 ) -> Result<proto::Ack> {
88 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
89 let call = IncomingCall {
90 room_id: envelope.payload.room_id,
91 participants: user_store
92 .update(&mut cx, |user_store, cx| {
93 user_store.get_users(envelope.payload.participant_user_ids, cx)
94 })?
95 .await?,
96 calling_user: user_store
97 .update(&mut cx, |user_store, cx| {
98 user_store.get_user(envelope.payload.calling_user_id, cx)
99 })?
100 .await?,
101 initial_project: envelope.payload.initial_project,
102 };
103 this.update(&mut cx, |this, _| {
104 *this.incoming_call.0.borrow_mut() = Some(call);
105 })?;
106
107 Ok(proto::Ack {})
108 }
109
110 async fn handle_call_canceled(
111 this: Model<Self>,
112 envelope: TypedEnvelope<proto::CallCanceled>,
113 _: Arc<Client>,
114 mut cx: AsyncAppContext,
115 ) -> Result<()> {
116 this.update(&mut cx, |this, _| {
117 let mut incoming_call = this.incoming_call.0.borrow_mut();
118 if incoming_call
119 .as_ref()
120 .map_or(false, |call| call.room_id == envelope.payload.room_id)
121 {
122 incoming_call.take();
123 }
124 })?;
125 Ok(())
126 }
127
128 pub fn global(cx: &AppContext) -> Model<Self> {
129 cx.global::<Model<Self>>().clone()
130 }
131
132 pub fn invite(
133 &mut self,
134 called_user_id: u64,
135 initial_project: Option<Model<Project>>,
136 cx: &mut ModelContext<Self>,
137 ) -> Task<Result<()>> {
138 if !self.pending_invites.insert(called_user_id) {
139 return Task::ready(Err(anyhow!("user was already invited")));
140 }
141 cx.notify();
142
143 let room = if let Some(room) = self.room().cloned() {
144 Some(Task::ready(Ok(room)).shared())
145 } else {
146 self.pending_room_creation.clone()
147 };
148
149 let invite = if let Some(room) = room {
150 cx.spawn(move |_, mut cx| async move {
151 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
152
153 let initial_project_id = if let Some(initial_project) = initial_project {
154 Some(
155 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
156 .await?,
157 )
158 } else {
159 None
160 };
161
162 room.update(&mut cx, move |room, cx| {
163 room.call(called_user_id, initial_project_id, cx)
164 })?
165 .await?;
166
167 anyhow::Ok(())
168 })
169 } else {
170 let client = self.client.clone();
171 let user_store = self.user_store.clone();
172 let room = cx
173 .spawn(move |this, mut cx| async move {
174 let create_room = async {
175 let room = cx
176 .update(|cx| {
177 Room::create(
178 called_user_id,
179 initial_project,
180 client,
181 user_store,
182 cx,
183 )
184 })?
185 .await?;
186
187 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
188 .await?;
189
190 anyhow::Ok(room)
191 };
192
193 let room = create_room.await;
194 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
195 room.map_err(Arc::new)
196 })
197 .shared();
198 self.pending_room_creation = Some(room.clone());
199 cx.executor().spawn(async move {
200 room.await.map_err(|err| anyhow!("{:?}", err))?;
201 anyhow::Ok(())
202 })
203 };
204
205 cx.spawn(move |this, mut cx| async move {
206 let result = invite.await;
207 if result.is_ok() {
208 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
209 } else {
210 // TODO: Resport collaboration error
211 }
212
213 this.update(&mut cx, |this, cx| {
214 this.pending_invites.remove(&called_user_id);
215 cx.notify();
216 })?;
217 result
218 })
219 }
220
221 pub fn cancel_invite(
222 &mut self,
223 called_user_id: u64,
224 cx: &mut ModelContext<Self>,
225 ) -> Task<Result<()>> {
226 let room_id = if let Some(room) = self.room() {
227 room.read(cx).id()
228 } else {
229 return Task::ready(Err(anyhow!("no active call")));
230 };
231
232 let client = self.client.clone();
233 cx.executor().spawn(async move {
234 client
235 .request(proto::CancelCall {
236 room_id,
237 called_user_id,
238 })
239 .await?;
240 anyhow::Ok(())
241 })
242 }
243
244 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
245 self.incoming_call.1.clone()
246 }
247
248 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
249 if self.room.is_some() {
250 return Task::ready(Err(anyhow!("cannot join while on another call")));
251 }
252
253 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
254 call
255 } else {
256 return Task::ready(Err(anyhow!("no incoming call")));
257 };
258
259 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
260
261 cx.spawn(|this, mut cx| async move {
262 let room = join.await?;
263 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
264 .await?;
265 this.update(&mut cx, |this, cx| {
266 this.report_call_event("accept incoming", cx)
267 })?;
268 Ok(())
269 })
270 }
271
272 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
273 let call = self
274 .incoming_call
275 .0
276 .borrow_mut()
277 .take()
278 .ok_or_else(|| anyhow!("no incoming call"))?;
279 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
280 self.client.send(proto::DeclineCall {
281 room_id: call.room_id,
282 })?;
283 Ok(())
284 }
285
286 pub fn join_channel(
287 &mut self,
288 channel_id: u64,
289 cx: &mut ModelContext<Self>,
290 ) -> Task<Result<Model<Room>>> {
291 if let Some(room) = self.room().cloned() {
292 if room.read(cx).channel_id() == Some(channel_id) {
293 return Task::ready(Ok(room));
294 } else {
295 room.update(cx, |room, cx| room.clear_state(cx));
296 }
297 }
298
299 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
300
301 cx.spawn(|this, mut cx| async move {
302 let room = join.await?;
303 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
304 .await?;
305 this.update(&mut cx, |this, cx| {
306 this.report_call_event("join channel", cx)
307 })?;
308 Ok(room)
309 })
310 }
311
312 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
313 cx.notify();
314 self.report_call_event("hang up", cx);
315
316 Audio::end_call(cx);
317 if let Some((room, _)) = self.room.take() {
318 room.update(cx, |room, cx| room.leave(cx))
319 } else {
320 Task::ready(Ok(()))
321 }
322 }
323
324 pub fn share_project(
325 &mut self,
326 project: Model<Project>,
327 cx: &mut ModelContext<Self>,
328 ) -> Task<Result<u64>> {
329 if let Some((room, _)) = self.room.as_ref() {
330 self.report_call_event("share project", cx);
331 room.update(cx, |room, cx| room.share_project(project, cx))
332 } else {
333 Task::ready(Err(anyhow!("no active call")))
334 }
335 }
336
337 pub fn unshare_project(
338 &mut self,
339 project: Model<Project>,
340 cx: &mut ModelContext<Self>,
341 ) -> Result<()> {
342 if let Some((room, _)) = self.room.as_ref() {
343 self.report_call_event("unshare project", cx);
344 room.update(cx, |room, cx| room.unshare_project(project, cx))
345 } else {
346 Err(anyhow!("no active call"))
347 }
348 }
349
350 pub fn location(&self) -> Option<&WeakModel<Project>> {
351 self.location.as_ref()
352 }
353
354 pub fn set_location(
355 &mut self,
356 project: Option<&Model<Project>>,
357 cx: &mut ModelContext<Self>,
358 ) -> Task<Result<()>> {
359 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
360 self.location = project.map(|project| project.downgrade());
361 if let Some((room, _)) = self.room.as_ref() {
362 return room.update(cx, |room, cx| room.set_location(project, cx));
363 }
364 }
365 Task::ready(Ok(()))
366 }
367
368 fn set_room(
369 &mut self,
370 room: Option<Model<Room>>,
371 cx: &mut ModelContext<Self>,
372 ) -> Task<Result<()>> {
373 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
374 cx.notify();
375 if let Some(room) = room {
376 if room.read(cx).status().is_offline() {
377 self.room = None;
378 Task::ready(Ok(()))
379 } else {
380 let subscriptions = vec![
381 cx.observe(&room, |this, room, cx| {
382 if room.read(cx).status().is_offline() {
383 this.set_room(None, cx).detach_and_log_err(cx);
384 }
385
386 cx.notify();
387 }),
388 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
389 ];
390 self.room = Some((room.clone(), subscriptions));
391 let location = self
392 .location
393 .as_ref()
394 .and_then(|location| location.upgrade());
395 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
396 }
397 } else {
398 self.room = None;
399 Task::ready(Ok(()))
400 }
401 } else {
402 Task::ready(Ok(()))
403 }
404 }
405
406 pub fn room(&self) -> Option<&Model<Room>> {
407 self.room.as_ref().map(|(room, _)| room)
408 }
409
410 pub fn client(&self) -> Arc<Client> {
411 self.client.clone()
412 }
413
414 pub fn pending_invites(&self) -> &HashSet<u64> {
415 &self.pending_invites
416 }
417
418 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
419 if let Some(room) = self.room() {
420 let room = room.read(cx);
421 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
422 }
423 }
424}
425
426pub fn report_call_event_for_room(
427 operation: &'static str,
428 room_id: u64,
429 channel_id: Option<u64>,
430 client: &Arc<Client>,
431 cx: &AppContext,
432) {
433 let telemetry = client.telemetry();
434 let telemetry_settings = *TelemetrySettings::get_global(cx);
435 let event = ClickhouseEvent::Call {
436 operation,
437 room_id: Some(room_id),
438 channel_id,
439 };
440 telemetry.report_clickhouse_event(event, telemetry_settings);
441}
442
443pub fn report_call_event_for_channel(
444 operation: &'static str,
445 channel_id: u64,
446 client: &Arc<Client>,
447 cx: &AppContext,
448) {
449 let room = ActiveCall::global(cx).read(cx).room();
450
451 let telemetry = client.telemetry();
452
453 let telemetry_settings = *TelemetrySettings::get_global(cx);
454
455 let event = ClickhouseEvent::Call {
456 operation,
457 room_id: room.map(|r| r.read(cx).id()),
458 channel_id: Some(channel_id),
459 };
460 telemetry.report_clickhouse_event(event, telemetry_settings);
461}