1pub mod participant;
2pub mod room;
3
4use anyhow::{anyhow, Result};
5use client::{proto, Client, TypedEnvelope, User, UserStore};
6use collections::HashSet;
7use gpui::{
8 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
9 Subscription, Task, WeakModelHandle,
10};
11pub use participant::ParticipantLocation;
12use postage::watch;
13use project::Project;
14pub use room::Room;
15use std::sync::Arc;
16
17pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
18 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
19 cx.set_global(active_call);
20}
21
22#[derive(Clone)]
23pub struct IncomingCall {
24 pub room_id: u64,
25 pub calling_user: Arc<User>,
26 pub participants: Vec<Arc<User>>,
27 pub initial_project: Option<proto::ParticipantProject>,
28}
29
30pub struct ActiveCall {
31 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
32 location: Option<WeakModelHandle<Project>>,
33 pending_invites: HashSet<u64>,
34 incoming_call: (
35 watch::Sender<Option<IncomingCall>>,
36 watch::Receiver<Option<IncomingCall>>,
37 ),
38 client: Arc<Client>,
39 user_store: ModelHandle<UserStore>,
40 _subscriptions: Vec<client::Subscription>,
41}
42
43impl Entity for ActiveCall {
44 type Event = room::Event;
45}
46
47impl ActiveCall {
48 fn new(
49 client: Arc<Client>,
50 user_store: ModelHandle<UserStore>,
51 cx: &mut ModelContext<Self>,
52 ) -> Self {
53 Self {
54 room: None,
55 location: None,
56 pending_invites: Default::default(),
57 incoming_call: watch::channel(),
58 _subscriptions: vec![
59 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
60 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
61 ],
62 client,
63 user_store,
64 }
65 }
66
67 async fn handle_incoming_call(
68 this: ModelHandle<Self>,
69 envelope: TypedEnvelope<proto::IncomingCall>,
70 _: Arc<Client>,
71 mut cx: AsyncAppContext,
72 ) -> Result<proto::Ack> {
73 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
74 let call = IncomingCall {
75 room_id: envelope.payload.room_id,
76 participants: user_store
77 .update(&mut cx, |user_store, cx| {
78 user_store.get_users(envelope.payload.participant_user_ids, cx)
79 })
80 .await?,
81 calling_user: user_store
82 .update(&mut cx, |user_store, cx| {
83 user_store.get_user(envelope.payload.calling_user_id, cx)
84 })
85 .await?,
86 initial_project: envelope.payload.initial_project,
87 };
88 this.update(&mut cx, |this, _| {
89 *this.incoming_call.0.borrow_mut() = Some(call);
90 });
91
92 Ok(proto::Ack {})
93 }
94
95 async fn handle_call_canceled(
96 this: ModelHandle<Self>,
97 envelope: TypedEnvelope<proto::CallCanceled>,
98 _: Arc<Client>,
99 mut cx: AsyncAppContext,
100 ) -> Result<()> {
101 this.update(&mut cx, |this, _| {
102 let mut incoming_call = this.incoming_call.0.borrow_mut();
103 if incoming_call
104 .as_ref()
105 .map_or(false, |call| call.room_id == envelope.payload.room_id)
106 {
107 incoming_call.take();
108 }
109 });
110 Ok(())
111 }
112
113 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
114 cx.global::<ModelHandle<Self>>().clone()
115 }
116
117 pub fn invite(
118 &mut self,
119 called_user_id: u64,
120 initial_project: Option<ModelHandle<Project>>,
121 cx: &mut ModelContext<Self>,
122 ) -> Task<Result<()>> {
123 let client = self.client.clone();
124 let user_store = self.user_store.clone();
125 if !self.pending_invites.insert(called_user_id) {
126 return Task::ready(Err(anyhow!("user was already invited")));
127 }
128
129 cx.notify();
130 cx.spawn(|this, mut cx| async move {
131 let invite = async {
132 if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
133 let initial_project_id = if let Some(initial_project) = initial_project {
134 Some(
135 room.update(&mut cx, |room, cx| {
136 room.share_project(initial_project, cx)
137 })
138 .await?,
139 )
140 } else {
141 None
142 };
143
144 room.update(&mut cx, |room, cx| {
145 room.call(called_user_id, initial_project_id, cx)
146 })
147 .await?;
148 } else {
149 let room = cx
150 .update(|cx| {
151 Room::create(called_user_id, initial_project, client, user_store, cx)
152 })
153 .await?;
154
155 this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
156 .await?;
157 };
158
159 Ok(())
160 };
161
162 let result = invite.await;
163 this.update(&mut cx, |this, cx| {
164 this.pending_invites.remove(&called_user_id);
165 cx.notify();
166 });
167 result
168 })
169 }
170
171 pub fn cancel_invite(
172 &mut self,
173 called_user_id: u64,
174 cx: &mut ModelContext<Self>,
175 ) -> Task<Result<()>> {
176 let room_id = if let Some(room) = self.room() {
177 room.read(cx).id()
178 } else {
179 return Task::ready(Err(anyhow!("no active call")));
180 };
181
182 let client = self.client.clone();
183 cx.foreground().spawn(async move {
184 client
185 .request(proto::CancelCall {
186 room_id,
187 called_user_id,
188 })
189 .await?;
190 anyhow::Ok(())
191 })
192 }
193
194 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
195 self.incoming_call.1.clone()
196 }
197
198 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
199 if self.room.is_some() {
200 return Task::ready(Err(anyhow!("cannot join while on another call")));
201 }
202
203 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
204 call
205 } else {
206 return Task::ready(Err(anyhow!("no incoming call")));
207 };
208
209 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
210 cx.spawn(|this, mut cx| async move {
211 let room = join.await?;
212 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
213 .await?;
214 Ok(())
215 })
216 }
217
218 pub fn decline_incoming(&mut self) -> Result<()> {
219 let call = self
220 .incoming_call
221 .0
222 .borrow_mut()
223 .take()
224 .ok_or_else(|| anyhow!("no incoming call"))?;
225 self.client.send(proto::DeclineCall {
226 room_id: call.room_id,
227 })?;
228 Ok(())
229 }
230
231 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
232 if let Some((room, _)) = self.room.take() {
233 room.update(cx, |room, cx| room.leave(cx))?;
234 cx.notify();
235 }
236 Ok(())
237 }
238
239 pub fn share_project(
240 &mut self,
241 project: ModelHandle<Project>,
242 cx: &mut ModelContext<Self>,
243 ) -> Task<Result<u64>> {
244 if let Some((room, _)) = self.room.as_ref() {
245 room.update(cx, |room, cx| room.share_project(project, cx))
246 } else {
247 Task::ready(Err(anyhow!("no active call")))
248 }
249 }
250
251 pub fn set_location(
252 &mut self,
253 project: Option<&ModelHandle<Project>>,
254 cx: &mut ModelContext<Self>,
255 ) -> Task<Result<()>> {
256 self.location = project.map(|project| project.downgrade());
257 if let Some((room, _)) = self.room.as_ref() {
258 room.update(cx, |room, cx| room.set_location(project, cx))
259 } else {
260 Task::ready(Ok(()))
261 }
262 }
263
264 fn set_room(
265 &mut self,
266 room: Option<ModelHandle<Room>>,
267 cx: &mut ModelContext<Self>,
268 ) -> Task<Result<()>> {
269 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
270 cx.notify();
271 if let Some(room) = room {
272 if room.read(cx).status().is_offline() {
273 self.room = None;
274 Task::ready(Ok(()))
275 } else {
276 let subscriptions = vec![
277 cx.observe(&room, |this, room, cx| {
278 if room.read(cx).status().is_offline() {
279 this.set_room(None, cx).detach_and_log_err(cx);
280 }
281
282 cx.notify();
283 }),
284 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
285 ];
286 self.room = Some((room.clone(), subscriptions));
287 let location = self.location.and_then(|location| location.upgrade(cx));
288 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
289 }
290 } else {
291 self.room = None;
292 Task::ready(Ok(()))
293 }
294 } else {
295 Task::ready(Ok(()))
296 }
297 }
298
299 pub fn room(&self) -> Option<&ModelHandle<Room>> {
300 self.room.as_ref().map(|(room, _)| room)
301 }
302
303 pub fn pending_invites(&self) -> &HashSet<u64> {
304 &self.pending_invites
305 }
306}