1pub mod call_settings;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use audio::Audio;
7use call_settings::CallSettings;
8use channel::ChannelId;
9use client::{
10 proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
11 ZED_ALWAYS_ACTIVE,
12};
13use collections::HashSet;
14use futures::{future::Shared, FutureExt};
15use gpui::{
16 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
17 WeakModelHandle,
18};
19use postage::watch;
20use project::Project;
21use std::sync::Arc;
22
23pub use participant::ParticipantLocation;
24pub use room::Room;
25
26pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
27 settings::register::<CallSettings>(cx);
28
29 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
30 cx.set_global(active_call);
31}
32
33#[derive(Clone)]
34pub struct IncomingCall {
35 pub room_id: u64,
36 pub calling_user: Arc<User>,
37 pub participants: Vec<Arc<User>>,
38 pub initial_project: Option<proto::ParticipantProject>,
39}
40
41/// Singleton global maintaining the user's participation in a room across workspaces.
42pub struct ActiveCall {
43 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
44 pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
45 location: Option<WeakModelHandle<Project>>,
46 pending_invites: HashSet<u64>,
47 incoming_call: (
48 watch::Sender<Option<IncomingCall>>,
49 watch::Receiver<Option<IncomingCall>>,
50 ),
51 client: Arc<Client>,
52 user_store: ModelHandle<UserStore>,
53 _subscriptions: Vec<client::Subscription>,
54}
55
56impl Entity for ActiveCall {
57 type Event = room::Event;
58}
59
60impl ActiveCall {
61 fn new(
62 client: Arc<Client>,
63 user_store: ModelHandle<UserStore>,
64 cx: &mut ModelContext<Self>,
65 ) -> Self {
66 Self {
67 room: None,
68 pending_room_creation: None,
69 location: None,
70 pending_invites: Default::default(),
71 incoming_call: watch::channel(),
72
73 _subscriptions: vec![
74 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
75 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
76 ],
77 client,
78 user_store,
79 }
80 }
81
82 pub fn channel_id(&self, cx: &AppContext) -> Option<ChannelId> {
83 self.room()?.read(cx).channel_id()
84 }
85
86 async fn handle_incoming_call(
87 this: ModelHandle<Self>,
88 envelope: TypedEnvelope<proto::IncomingCall>,
89 _: Arc<Client>,
90 mut cx: AsyncAppContext,
91 ) -> Result<proto::Ack> {
92 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
93 let call = IncomingCall {
94 room_id: envelope.payload.room_id,
95 participants: user_store
96 .update(&mut cx, |user_store, cx| {
97 user_store.get_users(envelope.payload.participant_user_ids, cx)
98 })
99 .await?,
100 calling_user: user_store
101 .update(&mut cx, |user_store, cx| {
102 user_store.get_user(envelope.payload.calling_user_id, cx)
103 })
104 .await?,
105 initial_project: envelope.payload.initial_project,
106 };
107 this.update(&mut cx, |this, _| {
108 *this.incoming_call.0.borrow_mut() = Some(call);
109 });
110
111 Ok(proto::Ack {})
112 }
113
114 async fn handle_call_canceled(
115 this: ModelHandle<Self>,
116 envelope: TypedEnvelope<proto::CallCanceled>,
117 _: Arc<Client>,
118 mut cx: AsyncAppContext,
119 ) -> Result<()> {
120 this.update(&mut cx, |this, _| {
121 let mut incoming_call = this.incoming_call.0.borrow_mut();
122 if incoming_call
123 .as_ref()
124 .map_or(false, |call| call.room_id == envelope.payload.room_id)
125 {
126 incoming_call.take();
127 }
128 });
129 Ok(())
130 }
131
132 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
133 cx.global::<ModelHandle<Self>>().clone()
134 }
135
136 pub fn invite(
137 &mut self,
138 called_user_id: u64,
139 initial_project: Option<ModelHandle<Project>>,
140 cx: &mut ModelContext<Self>,
141 ) -> Task<Result<()>> {
142 if !self.pending_invites.insert(called_user_id) {
143 return Task::ready(Err(anyhow!("user was already invited")));
144 }
145 cx.notify();
146
147 let room = if let Some(room) = self.room().cloned() {
148 Some(Task::ready(Ok(room)).shared())
149 } else {
150 self.pending_room_creation.clone()
151 };
152
153 let invite = if let Some(room) = room {
154 cx.spawn_weak(|_, mut cx| async move {
155 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
156
157 let initial_project_id = if let Some(initial_project) = initial_project {
158 Some(
159 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
160 .await?,
161 )
162 } else {
163 None
164 };
165
166 room.update(&mut cx, |room, cx| {
167 room.call(called_user_id, initial_project_id, cx)
168 })
169 .await?;
170
171 anyhow::Ok(())
172 })
173 } else {
174 let client = self.client.clone();
175 let user_store = self.user_store.clone();
176 let room = cx
177 .spawn(|this, mut cx| async move {
178 let create_room = async {
179 let room = cx
180 .update(|cx| {
181 Room::create(
182 called_user_id,
183 initial_project,
184 client,
185 user_store,
186 cx,
187 )
188 })
189 .await?;
190
191 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
192 .await?;
193
194 anyhow::Ok(room)
195 };
196
197 let room = create_room.await;
198 this.update(&mut cx, |this, _| this.pending_room_creation = None);
199 room.map_err(Arc::new)
200 })
201 .shared();
202 self.pending_room_creation = Some(room.clone());
203 cx.foreground().spawn(async move {
204 room.await.map_err(|err| anyhow!("{:?}", err))?;
205 anyhow::Ok(())
206 })
207 };
208
209 cx.spawn(|this, mut cx| async move {
210 let result = invite.await;
211 if result.is_ok() {
212 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx));
213 } else {
214 // TODO: Resport collaboration error
215 }
216
217 this.update(&mut cx, |this, cx| {
218 this.pending_invites.remove(&called_user_id);
219 cx.notify();
220 });
221 result
222 })
223 }
224
225 pub fn cancel_invite(
226 &mut self,
227 called_user_id: u64,
228 cx: &mut ModelContext<Self>,
229 ) -> Task<Result<()>> {
230 let room_id = if let Some(room) = self.room() {
231 room.read(cx).id()
232 } else {
233 return Task::ready(Err(anyhow!("no active call")));
234 };
235
236 let client = self.client.clone();
237 cx.foreground().spawn(async move {
238 client
239 .request(proto::CancelCall {
240 room_id,
241 called_user_id,
242 })
243 .await?;
244 anyhow::Ok(())
245 })
246 }
247
248 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
249 self.incoming_call.1.clone()
250 }
251
252 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
253 if self.room.is_some() {
254 return Task::ready(Err(anyhow!("cannot join while on another call")));
255 }
256
257 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
258 call
259 } else {
260 return Task::ready(Err(anyhow!("no incoming call")));
261 };
262
263 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
264
265 cx.spawn(|this, mut cx| async move {
266 let room = join.await?;
267 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
268 .await?;
269 this.update(&mut cx, |this, cx| {
270 this.report_call_event("accept incoming", cx)
271 });
272 Ok(())
273 })
274 }
275
276 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
277 let call = self
278 .incoming_call
279 .0
280 .borrow_mut()
281 .take()
282 .ok_or_else(|| anyhow!("no incoming call"))?;
283 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
284 self.client.send(proto::DeclineCall {
285 room_id: call.room_id,
286 })?;
287 Ok(())
288 }
289
290 pub fn join_channel(
291 &mut self,
292 channel_id: u64,
293 cx: &mut ModelContext<Self>,
294 ) -> Task<Result<ModelHandle<Room>>> {
295 if let Some(room) = self.room().cloned() {
296 if room.read(cx).channel_id() == Some(channel_id) {
297 return Task::ready(Ok(room));
298 } else {
299 room.update(cx, |room, cx| room.clear_state(cx));
300 }
301 }
302
303 let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
304
305 cx.spawn(|this, mut cx| async move {
306 let room = join.await?;
307 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
308 .await?;
309 this.update(&mut cx, |this, cx| {
310 this.report_call_event("join channel", cx)
311 });
312 Ok(room)
313 })
314 }
315
316 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
317 cx.notify();
318 self.report_call_event("hang up", cx);
319 Audio::end_call(cx);
320 if let Some((room, _)) = self.room.take() {
321 room.update(cx, |room, cx| room.leave(cx))
322 } else {
323 Task::ready(Ok(()))
324 }
325 }
326
327 pub fn share_project(
328 &mut self,
329 project: ModelHandle<Project>,
330 cx: &mut ModelContext<Self>,
331 ) -> Task<Result<u64>> {
332 if let Some((room, _)) = self.room.as_ref() {
333 self.report_call_event("share project", cx);
334 room.update(cx, |room, cx| room.share_project(project, cx))
335 } else {
336 Task::ready(Err(anyhow!("no active call")))
337 }
338 }
339
340 pub fn unshare_project(
341 &mut self,
342 project: ModelHandle<Project>,
343 cx: &mut ModelContext<Self>,
344 ) -> Result<()> {
345 if let Some((room, _)) = self.room.as_ref() {
346 self.report_call_event("unshare project", cx);
347 room.update(cx, |room, cx| room.unshare_project(project, cx))
348 } else {
349 Err(anyhow!("no active call"))
350 }
351 }
352
353 pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
354 self.location.as_ref()
355 }
356
357 pub fn set_location(
358 &mut self,
359 project: Option<&ModelHandle<Project>>,
360 cx: &mut ModelContext<Self>,
361 ) -> Task<Result<()>> {
362 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
363 self.location = project.map(|project| project.downgrade());
364 if let Some((room, _)) = self.room.as_ref() {
365 return room.update(cx, |room, cx| room.set_location(project, cx));
366 }
367 }
368 Task::ready(Ok(()))
369 }
370
371 fn set_room(
372 &mut self,
373 room: Option<ModelHandle<Room>>,
374 cx: &mut ModelContext<Self>,
375 ) -> Task<Result<()>> {
376 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
377 cx.notify();
378 if let Some(room) = room {
379 if room.read(cx).status().is_offline() {
380 self.room = None;
381 Task::ready(Ok(()))
382 } else {
383 let subscriptions = vec![
384 cx.observe(&room, |this, room, cx| {
385 if room.read(cx).status().is_offline() {
386 this.set_room(None, cx).detach_and_log_err(cx);
387 }
388
389 cx.notify();
390 }),
391 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
392 ];
393 self.room = Some((room.clone(), subscriptions));
394 let location = self.location.and_then(|location| location.upgrade(cx));
395 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
396 }
397 } else {
398 self.room = None;
399 Task::ready(Ok(()))
400 }
401 } else {
402 Task::ready(Ok(()))
403 }
404 }
405
406 pub fn room(&self) -> Option<&ModelHandle<Room>> {
407 self.room.as_ref().map(|(room, _)| room)
408 }
409
410 pub fn client(&self) -> Arc<Client> {
411 self.client.clone()
412 }
413
414 pub fn pending_invites(&self) -> &HashSet<u64> {
415 &self.pending_invites
416 }
417
418 pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) {
419 if let Some(room) = self.room() {
420 let room = room.read(cx);
421 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
422 }
423 }
424}
425
426pub fn report_call_event_for_room(
427 operation: &'static str,
428 room_id: u64,
429 channel_id: Option<u64>,
430 client: &Arc<Client>,
431 cx: &AppContext,
432) {
433 let telemetry = client.telemetry();
434 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
435 let event = ClickhouseEvent::Call {
436 operation,
437 room_id: Some(room_id),
438 channel_id,
439 };
440 telemetry.report_clickhouse_event(event, telemetry_settings);
441}
442
443pub fn report_call_event_for_channel(
444 operation: &'static str,
445 channel_id: u64,
446 client: &Arc<Client>,
447 cx: &AppContext,
448) {
449 let room = ActiveCall::global(cx).read(cx).room();
450
451 let telemetry = client.telemetry();
452 let telemetry_settings = *settings::get::<TelemetrySettings>(cx);
453
454 let event = ClickhouseEvent::Call {
455 operation,
456 room_id: room.map(|r| r.read(cx).id()),
457 channel_id: Some(channel_id),
458 };
459 telemetry.report_clickhouse_event(event, telemetry_settings);
460}