1pub mod participant;
2pub mod room;
3
4use anyhow::{anyhow, Result};
5use client::{proto, Client, TypedEnvelope, User, UserStore};
6use collections::HashSet;
7use gpui::{
8 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
9 Subscription, Task, WeakModelHandle,
10};
11pub use participant::ParticipantLocation;
12use postage::watch;
13use project::Project;
14pub use room::Room;
15use std::sync::Arc;
16
17pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
18 let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
19 cx.set_global(active_call);
20}
21
22#[derive(Clone)]
23pub struct IncomingCall {
24 pub room_id: u64,
25 pub caller: Arc<User>,
26 pub participants: Vec<Arc<User>>,
27 pub initial_project: Option<proto::ParticipantProject>,
28}
29
30pub struct ActiveCall {
31 room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
32 location: Option<WeakModelHandle<Project>>,
33 pending_invites: HashSet<u64>,
34 incoming_call: (
35 watch::Sender<Option<IncomingCall>>,
36 watch::Receiver<Option<IncomingCall>>,
37 ),
38 client: Arc<Client>,
39 user_store: ModelHandle<UserStore>,
40 _subscriptions: Vec<client::Subscription>,
41}
42
43impl Entity for ActiveCall {
44 type Event = room::Event;
45}
46
47impl ActiveCall {
48 fn new(
49 client: Arc<Client>,
50 user_store: ModelHandle<UserStore>,
51 cx: &mut ModelContext<Self>,
52 ) -> Self {
53 Self {
54 room: None,
55 location: None,
56 pending_invites: Default::default(),
57 incoming_call: watch::channel(),
58 _subscriptions: vec![
59 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
60 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
61 ],
62 client,
63 user_store,
64 }
65 }
66
67 async fn handle_incoming_call(
68 this: ModelHandle<Self>,
69 envelope: TypedEnvelope<proto::IncomingCall>,
70 _: Arc<Client>,
71 mut cx: AsyncAppContext,
72 ) -> Result<proto::Ack> {
73 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
74 let call = IncomingCall {
75 room_id: envelope.payload.room_id,
76 participants: user_store
77 .update(&mut cx, |user_store, cx| {
78 user_store.get_users(envelope.payload.participant_user_ids, cx)
79 })
80 .await?,
81 caller: user_store
82 .update(&mut cx, |user_store, cx| {
83 user_store.get_user(envelope.payload.caller_user_id, cx)
84 })
85 .await?,
86 initial_project: envelope.payload.initial_project,
87 };
88 this.update(&mut cx, |this, _| {
89 *this.incoming_call.0.borrow_mut() = Some(call);
90 });
91
92 Ok(proto::Ack {})
93 }
94
95 async fn handle_call_canceled(
96 this: ModelHandle<Self>,
97 _: TypedEnvelope<proto::CallCanceled>,
98 _: Arc<Client>,
99 mut cx: AsyncAppContext,
100 ) -> Result<()> {
101 this.update(&mut cx, |this, _| {
102 *this.incoming_call.0.borrow_mut() = None;
103 });
104 Ok(())
105 }
106
107 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
108 cx.global::<ModelHandle<Self>>().clone()
109 }
110
111 pub fn invite(
112 &mut self,
113 recipient_user_id: u64,
114 initial_project: Option<ModelHandle<Project>>,
115 cx: &mut ModelContext<Self>,
116 ) -> Task<Result<()>> {
117 let client = self.client.clone();
118 let user_store = self.user_store.clone();
119 if !self.pending_invites.insert(recipient_user_id) {
120 return Task::ready(Err(anyhow!("user was already invited")));
121 }
122
123 cx.notify();
124 cx.spawn(|this, mut cx| async move {
125 let invite = async {
126 if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
127 let initial_project_id = if let Some(initial_project) = initial_project {
128 Some(
129 room.update(&mut cx, |room, cx| {
130 room.share_project(initial_project, cx)
131 })
132 .await?,
133 )
134 } else {
135 None
136 };
137
138 room.update(&mut cx, |room, cx| {
139 room.call(recipient_user_id, initial_project_id, cx)
140 })
141 .await?;
142 } else {
143 let room = cx
144 .update(|cx| {
145 Room::create(recipient_user_id, initial_project, client, user_store, cx)
146 })
147 .await?;
148
149 this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
150 .await?;
151 };
152
153 Ok(())
154 };
155
156 let result = invite.await;
157 this.update(&mut cx, |this, cx| {
158 this.pending_invites.remove(&recipient_user_id);
159 cx.notify();
160 });
161 result
162 })
163 }
164
165 pub fn cancel_invite(
166 &mut self,
167 recipient_user_id: u64,
168 cx: &mut ModelContext<Self>,
169 ) -> Task<Result<()>> {
170 let room_id = if let Some(room) = self.room() {
171 room.read(cx).id()
172 } else {
173 return Task::ready(Err(anyhow!("no active call")));
174 };
175
176 let client = self.client.clone();
177 cx.foreground().spawn(async move {
178 client
179 .request(proto::CancelCall {
180 room_id,
181 recipient_user_id,
182 })
183 .await?;
184 anyhow::Ok(())
185 })
186 }
187
188 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
189 self.incoming_call.1.clone()
190 }
191
192 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
193 if self.room.is_some() {
194 return Task::ready(Err(anyhow!("cannot join while on another call")));
195 }
196
197 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
198 call
199 } else {
200 return Task::ready(Err(anyhow!("no incoming call")));
201 };
202
203 let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
204 cx.spawn(|this, mut cx| async move {
205 let room = join.await?;
206 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
207 .await?;
208 Ok(())
209 })
210 }
211
212 pub fn decline_incoming(&mut self) -> Result<()> {
213 let call = self
214 .incoming_call
215 .0
216 .borrow_mut()
217 .take()
218 .ok_or_else(|| anyhow!("no incoming call"))?;
219 self.client.send(proto::DeclineCall {
220 room_id: call.room_id,
221 })?;
222 Ok(())
223 }
224
225 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
226 if let Some((room, _)) = self.room.take() {
227 room.update(cx, |room, cx| room.leave(cx))?;
228 cx.notify();
229 }
230 Ok(())
231 }
232
233 pub fn share_project(
234 &mut self,
235 project: ModelHandle<Project>,
236 cx: &mut ModelContext<Self>,
237 ) -> Task<Result<u64>> {
238 if let Some((room, _)) = self.room.as_ref() {
239 room.update(cx, |room, cx| room.share_project(project, cx))
240 } else {
241 Task::ready(Err(anyhow!("no active call")))
242 }
243 }
244
245 pub fn set_location(
246 &mut self,
247 project: Option<&ModelHandle<Project>>,
248 cx: &mut ModelContext<Self>,
249 ) -> Task<Result<()>> {
250 self.location = project.map(|project| project.downgrade());
251 if let Some((room, _)) = self.room.as_ref() {
252 room.update(cx, |room, cx| room.set_location(project, cx))
253 } else {
254 Task::ready(Ok(()))
255 }
256 }
257
258 fn set_room(
259 &mut self,
260 room: Option<ModelHandle<Room>>,
261 cx: &mut ModelContext<Self>,
262 ) -> Task<Result<()>> {
263 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
264 cx.notify();
265 if let Some(room) = room {
266 if room.read(cx).status().is_offline() {
267 self.room = None;
268 Task::ready(Ok(()))
269 } else {
270 let subscriptions = vec![
271 cx.observe(&room, |this, room, cx| {
272 if room.read(cx).status().is_offline() {
273 this.set_room(None, cx).detach_and_log_err(cx);
274 }
275
276 cx.notify();
277 }),
278 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
279 ];
280 self.room = Some((room.clone(), subscriptions));
281 let location = self.location.and_then(|location| location.upgrade(cx));
282 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
283 }
284 } else {
285 self.room = None;
286 Task::ready(Ok(()))
287 }
288 } else {
289 Task::ready(Ok(()))
290 }
291 }
292
293 pub fn room(&self) -> Option<&ModelHandle<Room>> {
294 self.room.as_ref().map(|(room, _)| room)
295 }
296
297 pub fn pending_invites(&self) -> &HashSet<u64> {
298 &self.pending_invites
299 }
300}