1mod indicator;
2pub mod participant;
3pub mod room;
4
5use std::sync::Arc;
6
7use anyhow::{anyhow, Result};
8use client::{proto, Client, TypedEnvelope, User, UserStore};
9use collections::HashSet;
10use postage::watch;
11
12use gpui::{
13 actions, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
14 Subscription, Task, ViewHandle, WeakModelHandle,
15};
16use project::Project;
17use settings::Settings;
18
19use indicator::SharingStatusIndicator;
20pub use participant::ParticipantLocation;
21pub use room::Room;
22
23actions!(collab, [ToggleScreenSharing]);
24
25pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
26 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
27 cx.set_global(active_call);
28 cx.add_global_action(toggle_screen_sharing);
29}
30
31#[derive(Clone)]
32pub struct IncomingCall {
33 pub room_id: u64,
34 pub calling_user: Arc<User>,
35 pub participants: Vec<Arc<User>>,
36 pub initial_project: Option<proto::ParticipantProject>,
37}
38
39pub struct ActiveCall {
40 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
41 location: Option<WeakModelHandle<Project>>,
42 pending_invites: HashSet<u64>,
43 incoming_call: (
44 watch::Sender<Option<IncomingCall>>,
45 watch::Receiver<Option<IncomingCall>>,
46 ),
47 client: Arc<Client>,
48 user_store: ModelHandle<UserStore>,
49 sharing_status_indicator: Option<(usize, ViewHandle<SharingStatusIndicator>)>,
50 _subscriptions: Vec<client::Subscription>,
51}
52
53impl Entity for ActiveCall {
54 type Event = room::Event;
55}
56
57impl ActiveCall {
58 fn new(
59 client: Arc<Client>,
60 user_store: ModelHandle<UserStore>,
61 cx: &mut ModelContext<Self>,
62 ) -> Self {
63 Self {
64 room: None,
65 location: None,
66 pending_invites: Default::default(),
67 incoming_call: watch::channel(),
68 _subscriptions: vec![
69 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
70 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
71 ],
72 client,
73 user_store,
74 sharing_status_indicator: None,
75 }
76 }
77
78 async fn handle_incoming_call(
79 this: ModelHandle<Self>,
80 envelope: TypedEnvelope<proto::IncomingCall>,
81 _: Arc<Client>,
82 mut cx: AsyncAppContext,
83 ) -> Result<proto::Ack> {
84 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
85 let call = IncomingCall {
86 room_id: envelope.payload.room_id,
87 participants: user_store
88 .update(&mut cx, |user_store, cx| {
89 user_store.get_users(envelope.payload.participant_user_ids, cx)
90 })
91 .await?,
92 calling_user: user_store
93 .update(&mut cx, |user_store, cx| {
94 user_store.get_user(envelope.payload.calling_user_id, cx)
95 })
96 .await?,
97 initial_project: envelope.payload.initial_project,
98 };
99 this.update(&mut cx, |this, _| {
100 *this.incoming_call.0.borrow_mut() = Some(call);
101 });
102
103 Ok(proto::Ack {})
104 }
105
106 async fn handle_call_canceled(
107 this: ModelHandle<Self>,
108 envelope: TypedEnvelope<proto::CallCanceled>,
109 _: Arc<Client>,
110 mut cx: AsyncAppContext,
111 ) -> Result<()> {
112 this.update(&mut cx, |this, _| {
113 let mut incoming_call = this.incoming_call.0.borrow_mut();
114 if incoming_call
115 .as_ref()
116 .map_or(false, |call| call.room_id == envelope.payload.room_id)
117 {
118 incoming_call.take();
119 }
120 });
121 Ok(())
122 }
123
124 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
125 cx.global::<ModelHandle<Self>>().clone()
126 }
127
128 pub fn invite(
129 &mut self,
130 called_user_id: u64,
131 initial_project: Option<ModelHandle<Project>>,
132 cx: &mut ModelContext<Self>,
133 ) -> Task<Result<()>> {
134 let client = self.client.clone();
135 let user_store = self.user_store.clone();
136 if !self.pending_invites.insert(called_user_id) {
137 return Task::ready(Err(anyhow!("user was already invited")));
138 }
139
140 cx.notify();
141 cx.spawn(|this, mut cx| async move {
142 let invite = async {
143 if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
144 let initial_project_id = if let Some(initial_project) = initial_project {
145 Some(
146 room.update(&mut cx, |room, cx| {
147 room.share_project(initial_project, cx)
148 })
149 .await?,
150 )
151 } else {
152 None
153 };
154
155 room.update(&mut cx, |room, cx| {
156 room.call(called_user_id, initial_project_id, cx)
157 })
158 .await?;
159 } else {
160 let room = cx
161 .update(|cx| {
162 Room::create(called_user_id, initial_project, client, user_store, cx)
163 })
164 .await?;
165
166 this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
167 .await?;
168 };
169
170 Ok(())
171 };
172
173 let result = invite.await;
174 this.update(&mut cx, |this, cx| {
175 this.pending_invites.remove(&called_user_id);
176 cx.notify();
177 });
178 result
179 })
180 }
181
182 pub fn cancel_invite(
183 &mut self,
184 called_user_id: u64,
185 cx: &mut ModelContext<Self>,
186 ) -> Task<Result<()>> {
187 let room_id = if let Some(room) = self.room() {
188 room.read(cx).id()
189 } else {
190 return Task::ready(Err(anyhow!("no active call")));
191 };
192
193 let client = self.client.clone();
194 cx.foreground().spawn(async move {
195 client
196 .request(proto::CancelCall {
197 room_id,
198 called_user_id,
199 })
200 .await?;
201 anyhow::Ok(())
202 })
203 }
204
205 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
206 self.incoming_call.1.clone()
207 }
208
209 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
210 if self.room.is_some() {
211 return Task::ready(Err(anyhow!("cannot join while on another call")));
212 }
213
214 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
215 call
216 } else {
217 return Task::ready(Err(anyhow!("no incoming call")));
218 };
219
220 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
221 cx.spawn(|this, mut cx| async move {
222 let room = join.await?;
223 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
224 .await?;
225 Ok(())
226 })
227 }
228
229 pub fn decline_incoming(&mut self) -> Result<()> {
230 let call = self
231 .incoming_call
232 .0
233 .borrow_mut()
234 .take()
235 .ok_or_else(|| anyhow!("no incoming call"))?;
236 self.client.send(proto::DeclineCall {
237 room_id: call.room_id,
238 })?;
239 Ok(())
240 }
241
242 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
243 if let Some((room, _)) = self.room.take() {
244 room.update(cx, |room, cx| room.leave(cx))?;
245 cx.notify();
246 }
247 Ok(())
248 }
249
250 pub fn share_project(
251 &mut self,
252 project: ModelHandle<Project>,
253 cx: &mut ModelContext<Self>,
254 ) -> Task<Result<u64>> {
255 if let Some((room, _)) = self.room.as_ref() {
256 room.update(cx, |room, cx| room.share_project(project, cx))
257 } else {
258 Task::ready(Err(anyhow!("no active call")))
259 }
260 }
261
262 pub fn set_location(
263 &mut self,
264 project: Option<&ModelHandle<Project>>,
265 cx: &mut ModelContext<Self>,
266 ) -> Task<Result<()>> {
267 self.location = project.map(|project| project.downgrade());
268 if let Some((room, _)) = self.room.as_ref() {
269 room.update(cx, |room, cx| room.set_location(project, cx))
270 } else {
271 Task::ready(Ok(()))
272 }
273 }
274
275 fn set_room(
276 &mut self,
277 room: Option<ModelHandle<Room>>,
278 cx: &mut ModelContext<Self>,
279 ) -> Task<Result<()>> {
280 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
281 cx.notify();
282 if let Some(room) = room {
283 if room.read(cx).status().is_offline() {
284 self.room = None;
285 Task::ready(Ok(()))
286 } else {
287 let subscriptions = vec![
288 cx.observe(&room, |this, room, cx| {
289 if room.read(cx).status().is_offline() {
290 this.set_room(None, cx).detach_and_log_err(cx);
291 }
292
293 this.set_sharing_status(room.read(cx).is_screen_sharing(), cx);
294
295 cx.notify();
296 }),
297 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
298 ];
299 self.room = Some((room.clone(), subscriptions));
300 let location = self.location.and_then(|location| location.upgrade(cx));
301 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
302 }
303 } else {
304 self.room = None;
305 Task::ready(Ok(()))
306 }
307 } else {
308 Task::ready(Ok(()))
309 }
310 }
311
312 pub fn room(&self) -> Option<&ModelHandle<Room>> {
313 self.room.as_ref().map(|(room, _)| room)
314 }
315
316 pub fn pending_invites(&self) -> &HashSet<u64> {
317 &self.pending_invites
318 }
319
320 pub fn set_sharing_status(&mut self, is_screen_sharing: bool, cx: &mut MutableAppContext) {
321 if is_screen_sharing {
322 if self.sharing_status_indicator.is_none()
323 && cx.global::<Settings>().show_call_status_icon
324 {
325 self.sharing_status_indicator =
326 Some(cx.add_status_bar_item(|_| SharingStatusIndicator));
327 }
328 } else if let Some((window_id, _)) = self.sharing_status_indicator.take() {
329 cx.remove_status_bar_item(window_id);
330 }
331 }
332}
333
334pub fn toggle_screen_sharing(_: &ToggleScreenSharing, cx: &mut MutableAppContext) {
335 if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
336 let toggle_screen_sharing = room.update(cx, |room, cx| {
337 if room.is_screen_sharing() {
338 Task::ready(room.unshare_screen(cx))
339 } else {
340 room.share_screen(cx)
341 }
342 });
343 toggle_screen_sharing.detach_and_log_err(cx);
344 }
345}