1mod indicator;
2pub mod participant;
3pub mod room;
4
5use anyhow::{anyhow, Result};
6use client::{proto, Client, TypedEnvelope, User, UserStore};
7use collections::HashSet;
8use gpui::{
9 actions, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
10 Subscription, Task, ViewHandle, WeakModelHandle,
11};
12use indicator::SharingStatusIndicator;
13pub use participant::ParticipantLocation;
14use postage::watch;
15use project::Project;
16pub use room::Room;
17use std::sync::Arc;
18
19actions!(collab, [ToggleScreenSharing]);
20
21pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
22 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
23 cx.set_global(active_call);
24 cx.add_global_action(toggle_screen_sharing);
25}
26
27#[derive(Clone)]
28pub struct IncomingCall {
29 pub room_id: u64,
30 pub calling_user: Arc<User>,
31 pub participants: Vec<Arc<User>>,
32 pub initial_project: Option<proto::ParticipantProject>,
33}
34
35pub struct ActiveCall {
36 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
37 location: Option<WeakModelHandle<Project>>,
38 pending_invites: HashSet<u64>,
39 incoming_call: (
40 watch::Sender<Option<IncomingCall>>,
41 watch::Receiver<Option<IncomingCall>>,
42 ),
43 client: Arc<Client>,
44 user_store: ModelHandle<UserStore>,
45 sharing_status_indicator: Option<(usize, ViewHandle<SharingStatusIndicator>)>,
46 _subscriptions: Vec<client::Subscription>,
47}
48
49impl Entity for ActiveCall {
50 type Event = room::Event;
51}
52
53impl ActiveCall {
54 fn new(
55 client: Arc<Client>,
56 user_store: ModelHandle<UserStore>,
57 cx: &mut ModelContext<Self>,
58 ) -> Self {
59 Self {
60 room: None,
61 location: None,
62 pending_invites: Default::default(),
63 incoming_call: watch::channel(),
64 _subscriptions: vec![
65 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
66 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
67 ],
68 client,
69 user_store,
70 sharing_status_indicator: None,
71 }
72 }
73
74 async fn handle_incoming_call(
75 this: ModelHandle<Self>,
76 envelope: TypedEnvelope<proto::IncomingCall>,
77 _: Arc<Client>,
78 mut cx: AsyncAppContext,
79 ) -> Result<proto::Ack> {
80 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
81 let call = IncomingCall {
82 room_id: envelope.payload.room_id,
83 participants: user_store
84 .update(&mut cx, |user_store, cx| {
85 user_store.get_users(envelope.payload.participant_user_ids, cx)
86 })
87 .await?,
88 calling_user: user_store
89 .update(&mut cx, |user_store, cx| {
90 user_store.get_user(envelope.payload.calling_user_id, cx)
91 })
92 .await?,
93 initial_project: envelope.payload.initial_project,
94 };
95 this.update(&mut cx, |this, _| {
96 *this.incoming_call.0.borrow_mut() = Some(call);
97 });
98
99 Ok(proto::Ack {})
100 }
101
102 async fn handle_call_canceled(
103 this: ModelHandle<Self>,
104 envelope: TypedEnvelope<proto::CallCanceled>,
105 _: Arc<Client>,
106 mut cx: AsyncAppContext,
107 ) -> Result<()> {
108 this.update(&mut cx, |this, _| {
109 let mut incoming_call = this.incoming_call.0.borrow_mut();
110 if incoming_call
111 .as_ref()
112 .map_or(false, |call| call.room_id == envelope.payload.room_id)
113 {
114 incoming_call.take();
115 }
116 });
117 Ok(())
118 }
119
120 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
121 cx.global::<ModelHandle<Self>>().clone()
122 }
123
124 pub fn invite(
125 &mut self,
126 called_user_id: u64,
127 initial_project: Option<ModelHandle<Project>>,
128 cx: &mut ModelContext<Self>,
129 ) -> Task<Result<()>> {
130 let client = self.client.clone();
131 let user_store = self.user_store.clone();
132 if !self.pending_invites.insert(called_user_id) {
133 return Task::ready(Err(anyhow!("user was already invited")));
134 }
135
136 cx.notify();
137 cx.spawn(|this, mut cx| async move {
138 let invite = async {
139 if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
140 let initial_project_id = if let Some(initial_project) = initial_project {
141 Some(
142 room.update(&mut cx, |room, cx| {
143 room.share_project(initial_project, cx)
144 })
145 .await?,
146 )
147 } else {
148 None
149 };
150
151 room.update(&mut cx, |room, cx| {
152 room.call(called_user_id, initial_project_id, cx)
153 })
154 .await?;
155 } else {
156 let room = cx
157 .update(|cx| {
158 Room::create(called_user_id, initial_project, client, user_store, cx)
159 })
160 .await?;
161
162 this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
163 .await?;
164 };
165
166 Ok(())
167 };
168
169 let result = invite.await;
170 this.update(&mut cx, |this, cx| {
171 this.pending_invites.remove(&called_user_id);
172 cx.notify();
173 });
174 result
175 })
176 }
177
178 pub fn cancel_invite(
179 &mut self,
180 called_user_id: u64,
181 cx: &mut ModelContext<Self>,
182 ) -> Task<Result<()>> {
183 let room_id = if let Some(room) = self.room() {
184 room.read(cx).id()
185 } else {
186 return Task::ready(Err(anyhow!("no active call")));
187 };
188
189 let client = self.client.clone();
190 cx.foreground().spawn(async move {
191 client
192 .request(proto::CancelCall {
193 room_id,
194 called_user_id,
195 })
196 .await?;
197 anyhow::Ok(())
198 })
199 }
200
201 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
202 self.incoming_call.1.clone()
203 }
204
205 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
206 if self.room.is_some() {
207 return Task::ready(Err(anyhow!("cannot join while on another call")));
208 }
209
210 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
211 call
212 } else {
213 return Task::ready(Err(anyhow!("no incoming call")));
214 };
215
216 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
217 cx.spawn(|this, mut cx| async move {
218 let room = join.await?;
219 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
220 .await?;
221 Ok(())
222 })
223 }
224
225 pub fn decline_incoming(&mut self) -> Result<()> {
226 let call = self
227 .incoming_call
228 .0
229 .borrow_mut()
230 .take()
231 .ok_or_else(|| anyhow!("no incoming call"))?;
232 self.client.send(proto::DeclineCall {
233 room_id: call.room_id,
234 })?;
235 Ok(())
236 }
237
238 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
239 if let Some((room, _)) = self.room.take() {
240 room.update(cx, |room, cx| room.leave(cx))?;
241 cx.notify();
242 }
243 Ok(())
244 }
245
246 pub fn share_project(
247 &mut self,
248 project: ModelHandle<Project>,
249 cx: &mut ModelContext<Self>,
250 ) -> Task<Result<u64>> {
251 if let Some((room, _)) = self.room.as_ref() {
252 room.update(cx, |room, cx| room.share_project(project, cx))
253 } else {
254 Task::ready(Err(anyhow!("no active call")))
255 }
256 }
257
258 pub fn set_location(
259 &mut self,
260 project: Option<&ModelHandle<Project>>,
261 cx: &mut ModelContext<Self>,
262 ) -> Task<Result<()>> {
263 self.location = project.map(|project| project.downgrade());
264 if let Some((room, _)) = self.room.as_ref() {
265 room.update(cx, |room, cx| room.set_location(project, cx))
266 } else {
267 Task::ready(Ok(()))
268 }
269 }
270
271 fn set_room(
272 &mut self,
273 room: Option<ModelHandle<Room>>,
274 cx: &mut ModelContext<Self>,
275 ) -> Task<Result<()>> {
276 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
277 cx.notify();
278 if let Some(room) = room {
279 if room.read(cx).status().is_offline() {
280 self.room = None;
281 Task::ready(Ok(()))
282 } else {
283 let subscriptions = vec![
284 cx.observe(&room, |this, room, cx| {
285 if room.read(cx).status().is_offline() {
286 this.set_room(None, cx).detach_and_log_err(cx);
287 }
288
289 this.set_sharing_status(room.read(cx).is_screen_sharing(), cx);
290
291 cx.notify();
292 }),
293 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
294 ];
295 self.room = Some((room.clone(), subscriptions));
296 let location = self.location.and_then(|location| location.upgrade(cx));
297 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
298 }
299 } else {
300 self.room = None;
301 Task::ready(Ok(()))
302 }
303 } else {
304 Task::ready(Ok(()))
305 }
306 }
307
308 pub fn room(&self) -> Option<&ModelHandle<Room>> {
309 self.room.as_ref().map(|(room, _)| room)
310 }
311
312 pub fn pending_invites(&self) -> &HashSet<u64> {
313 &self.pending_invites
314 }
315
316 pub fn set_sharing_status(&mut self, is_screen_sharing: bool, cx: &mut MutableAppContext) {
317 if is_screen_sharing {
318 if self.sharing_status_indicator.is_none() {
319 self.sharing_status_indicator =
320 Some(cx.add_status_bar_item(|_| SharingStatusIndicator));
321 }
322 } else if let Some((window_id, _)) = self.sharing_status_indicator.take() {
323 cx.remove_status_bar_item(window_id);
324 }
325 }
326}
327
328pub fn toggle_screen_sharing(_: &ToggleScreenSharing, cx: &mut MutableAppContext) {
329 if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
330 let toggle_screen_sharing = room.update(cx, |room, cx| {
331 if room.is_screen_sharing() {
332 Task::ready(room.unshare_screen(cx))
333 } else {
334 room.share_screen(cx)
335 }
336 });
337 toggle_screen_sharing.detach_and_log_err(cx);
338 }
339}