1pub mod participant;
2pub mod room;
3
4use std::sync::Arc;
5
6use anyhow::{anyhow, Result};
7use client::{proto, Client, TypedEnvelope, User, UserStore};
8use collections::HashSet;
9use postage::watch;
10
11use gpui::{
12 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
13 Subscription, Task, WeakModelHandle,
14};
15use project::Project;
16
17pub use participant::ParticipantLocation;
18pub use room::Room;
19
20pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
21 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
22 cx.set_global(active_call);
23}
24
25#[derive(Clone)]
26pub struct IncomingCall {
27 pub room_id: u64,
28 pub calling_user: Arc<User>,
29 pub participants: Vec<Arc<User>>,
30 pub initial_project: Option<proto::ParticipantProject>,
31}
32
33/// Singleton global maintaining the user's participation in a room across workspaces.
34pub struct ActiveCall {
35 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
36 location: Option<WeakModelHandle<Project>>,
37 pending_invites: HashSet<u64>,
38 incoming_call: (
39 watch::Sender<Option<IncomingCall>>,
40 watch::Receiver<Option<IncomingCall>>,
41 ),
42 client: Arc<Client>,
43 user_store: ModelHandle<UserStore>,
44 _subscriptions: Vec<client::Subscription>,
45}
46
47impl Entity for ActiveCall {
48 type Event = room::Event;
49}
50
51impl ActiveCall {
52 fn new(
53 client: Arc<Client>,
54 user_store: ModelHandle<UserStore>,
55 cx: &mut ModelContext<Self>,
56 ) -> Self {
57 Self {
58 room: None,
59 location: None,
60 pending_invites: Default::default(),
61 incoming_call: watch::channel(),
62 _subscriptions: vec![
63 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
64 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
65 ],
66 client,
67 user_store,
68 }
69 }
70
71 async fn handle_incoming_call(
72 this: ModelHandle<Self>,
73 envelope: TypedEnvelope<proto::IncomingCall>,
74 _: Arc<Client>,
75 mut cx: AsyncAppContext,
76 ) -> Result<proto::Ack> {
77 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
78 let call = IncomingCall {
79 room_id: envelope.payload.room_id,
80 participants: user_store
81 .update(&mut cx, |user_store, cx| {
82 user_store.get_users(envelope.payload.participant_user_ids, cx)
83 })
84 .await?,
85 calling_user: user_store
86 .update(&mut cx, |user_store, cx| {
87 user_store.get_user(envelope.payload.calling_user_id, cx)
88 })
89 .await?,
90 initial_project: envelope.payload.initial_project,
91 };
92 this.update(&mut cx, |this, _| {
93 *this.incoming_call.0.borrow_mut() = Some(call);
94 });
95
96 Ok(proto::Ack {})
97 }
98
99 async fn handle_call_canceled(
100 this: ModelHandle<Self>,
101 envelope: TypedEnvelope<proto::CallCanceled>,
102 _: Arc<Client>,
103 mut cx: AsyncAppContext,
104 ) -> Result<()> {
105 this.update(&mut cx, |this, _| {
106 let mut incoming_call = this.incoming_call.0.borrow_mut();
107 if incoming_call
108 .as_ref()
109 .map_or(false, |call| call.room_id == envelope.payload.room_id)
110 {
111 incoming_call.take();
112 }
113 });
114 Ok(())
115 }
116
117 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
118 cx.global::<ModelHandle<Self>>().clone()
119 }
120
121 pub fn invite(
122 &mut self,
123 called_user_id: u64,
124 initial_project: Option<ModelHandle<Project>>,
125 cx: &mut ModelContext<Self>,
126 ) -> Task<Result<()>> {
127 let client = self.client.clone();
128 let user_store = self.user_store.clone();
129 if !self.pending_invites.insert(called_user_id) {
130 return Task::ready(Err(anyhow!("user was already invited")));
131 }
132
133 cx.notify();
134 cx.spawn(|this, mut cx| async move {
135 let invite = async {
136 if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
137 let initial_project_id = if let Some(initial_project) = initial_project {
138 Some(
139 room.update(&mut cx, |room, cx| {
140 room.share_project(initial_project, cx)
141 })
142 .await?,
143 )
144 } else {
145 None
146 };
147
148 room.update(&mut cx, |room, cx| {
149 room.call(called_user_id, initial_project_id, cx)
150 })
151 .await?;
152 } else {
153 let room = cx
154 .update(|cx| {
155 Room::create(called_user_id, initial_project, client, user_store, cx)
156 })
157 .await?;
158
159 this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
160 .await?;
161 };
162
163 Ok(())
164 };
165
166 let result = invite.await;
167 this.update(&mut cx, |this, cx| {
168 this.pending_invites.remove(&called_user_id);
169 cx.notify();
170 });
171 result
172 })
173 }
174
175 pub fn cancel_invite(
176 &mut self,
177 called_user_id: u64,
178 cx: &mut ModelContext<Self>,
179 ) -> Task<Result<()>> {
180 let room_id = if let Some(room) = self.room() {
181 room.read(cx).id()
182 } else {
183 return Task::ready(Err(anyhow!("no active call")));
184 };
185
186 let client = self.client.clone();
187 cx.foreground().spawn(async move {
188 client
189 .request(proto::CancelCall {
190 room_id,
191 called_user_id,
192 })
193 .await?;
194 anyhow::Ok(())
195 })
196 }
197
198 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
199 self.incoming_call.1.clone()
200 }
201
202 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
203 if self.room.is_some() {
204 return Task::ready(Err(anyhow!("cannot join while on another call")));
205 }
206
207 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
208 call
209 } else {
210 return Task::ready(Err(anyhow!("no incoming call")));
211 };
212
213 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
214 cx.spawn(|this, mut cx| async move {
215 let room = join.await?;
216 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
217 .await?;
218 Ok(())
219 })
220 }
221
222 pub fn decline_incoming(&mut self) -> Result<()> {
223 let call = self
224 .incoming_call
225 .0
226 .borrow_mut()
227 .take()
228 .ok_or_else(|| anyhow!("no incoming call"))?;
229 self.client.send(proto::DeclineCall {
230 room_id: call.room_id,
231 })?;
232 Ok(())
233 }
234
235 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
236 if let Some((room, _)) = self.room.take() {
237 room.update(cx, |room, cx| room.leave(cx))?;
238 cx.notify();
239 }
240 Ok(())
241 }
242
243 pub fn share_project(
244 &mut self,
245 project: ModelHandle<Project>,
246 cx: &mut ModelContext<Self>,
247 ) -> Task<Result<u64>> {
248 if let Some((room, _)) = self.room.as_ref() {
249 room.update(cx, |room, cx| room.share_project(project, cx))
250 } else {
251 Task::ready(Err(anyhow!("no active call")))
252 }
253 }
254
255 pub fn set_location(
256 &mut self,
257 project: Option<&ModelHandle<Project>>,
258 cx: &mut ModelContext<Self>,
259 ) -> Task<Result<()>> {
260 self.location = project.map(|project| project.downgrade());
261 if let Some((room, _)) = self.room.as_ref() {
262 room.update(cx, |room, cx| room.set_location(project, cx))
263 } else {
264 Task::ready(Ok(()))
265 }
266 }
267
268 fn set_room(
269 &mut self,
270 room: Option<ModelHandle<Room>>,
271 cx: &mut ModelContext<Self>,
272 ) -> Task<Result<()>> {
273 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
274 cx.notify();
275 if let Some(room) = room {
276 if room.read(cx).status().is_offline() {
277 self.room = None;
278 Task::ready(Ok(()))
279 } else {
280 let subscriptions = vec![
281 cx.observe(&room, |this, room, cx| {
282 if room.read(cx).status().is_offline() {
283 this.set_room(None, cx).detach_and_log_err(cx);
284 }
285
286 cx.notify();
287 }),
288 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
289 ];
290 self.room = Some((room.clone(), subscriptions));
291 let location = self.location.and_then(|location| location.upgrade(cx));
292 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
293 }
294 } else {
295 self.room = None;
296 Task::ready(Ok(()))
297 }
298 } else {
299 Task::ready(Ok(()))
300 }
301 }
302
303 pub fn room(&self) -> Option<&ModelHandle<Room>> {
304 self.room.as_ref().map(|(room, _)| room)
305 }
306
307 pub fn pending_invites(&self) -> &HashSet<u64> {
308 &self.pending_invites
309 }
310}