1use crate::{
2 participant::{ParticipantLocation, RemoteParticipant},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use project::Project;
11use std::sync::Arc;
12use util::ResultExt;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum Event {
16 RemoteProjectShared { owner: Arc<User>, project_id: u64 },
17}
18
19pub struct Room {
20 id: u64,
21 status: RoomStatus,
22 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
23 pending_participants: Vec<Arc<User>>,
24 participant_user_ids: HashSet<u64>,
25 pending_call_count: usize,
26 leave_when_empty: bool,
27 client: Arc<Client>,
28 user_store: ModelHandle<UserStore>,
29 subscriptions: Vec<client::Subscription>,
30 pending_room_update: Option<Task<()>>,
31}
32
33impl Entity for Room {
34 type Event = Event;
35
36 fn release(&mut self, _: &mut MutableAppContext) {
37 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
38 }
39}
40
41impl Room {
42 fn new(
43 id: u64,
44 client: Arc<Client>,
45 user_store: ModelHandle<UserStore>,
46 cx: &mut ModelContext<Self>,
47 ) -> Self {
48 let mut client_status = client.status();
49 cx.spawn_weak(|this, mut cx| async move {
50 let is_connected = client_status
51 .next()
52 .await
53 .map_or(false, |s| s.is_connected());
54 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
55 if !is_connected || client_status.next().await.is_some() {
56 if let Some(this) = this.upgrade(&cx) {
57 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
58 }
59 }
60 })
61 .detach();
62
63 Self {
64 id,
65 status: RoomStatus::Online,
66 participant_user_ids: Default::default(),
67 remote_participants: Default::default(),
68 pending_participants: Default::default(),
69 pending_call_count: 0,
70 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
71 leave_when_empty: false,
72 pending_room_update: None,
73 client,
74 user_store,
75 }
76 }
77
78 pub(crate) fn create(
79 recipient_user_id: u64,
80 initial_project: Option<ModelHandle<Project>>,
81 client: Arc<Client>,
82 user_store: ModelHandle<UserStore>,
83 cx: &mut MutableAppContext,
84 ) -> Task<Result<ModelHandle<Self>>> {
85 cx.spawn(|mut cx| async move {
86 let response = client.request(proto::CreateRoom {}).await?;
87 let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
88
89 let initial_project_id = if let Some(initial_project) = initial_project {
90 let initial_project_id = room
91 .update(&mut cx, |room, cx| {
92 room.share_project(initial_project.clone(), cx)
93 })
94 .await?;
95 Some(initial_project_id)
96 } else {
97 None
98 };
99
100 match room
101 .update(&mut cx, |room, cx| {
102 room.leave_when_empty = true;
103 room.call(recipient_user_id, initial_project_id, cx)
104 })
105 .await
106 {
107 Ok(()) => Ok(room),
108 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
109 }
110 })
111 }
112
113 pub(crate) fn join(
114 call: &IncomingCall,
115 client: Arc<Client>,
116 user_store: ModelHandle<UserStore>,
117 cx: &mut MutableAppContext,
118 ) -> Task<Result<ModelHandle<Self>>> {
119 let room_id = call.room_id;
120 cx.spawn(|mut cx| async move {
121 let response = client.request(proto::JoinRoom { id: room_id }).await?;
122 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
123 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
124 room.update(&mut cx, |room, cx| {
125 room.leave_when_empty = true;
126 room.apply_room_update(room_proto, cx)?;
127 anyhow::Ok(())
128 })?;
129 Ok(room)
130 })
131 }
132
133 fn should_leave(&self) -> bool {
134 self.leave_when_empty
135 && self.pending_room_update.is_none()
136 && self.pending_participants.is_empty()
137 && self.remote_participants.is_empty()
138 && self.pending_call_count == 0
139 }
140
141 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
142 if self.status.is_offline() {
143 return Err(anyhow!("room is offline"));
144 }
145
146 cx.notify();
147 self.status = RoomStatus::Offline;
148 self.remote_participants.clear();
149 self.pending_participants.clear();
150 self.participant_user_ids.clear();
151 self.subscriptions.clear();
152 self.client.send(proto::LeaveRoom { id: self.id })?;
153 Ok(())
154 }
155
156 pub fn id(&self) -> u64 {
157 self.id
158 }
159
160 pub fn status(&self) -> RoomStatus {
161 self.status
162 }
163
164 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
165 &self.remote_participants
166 }
167
168 pub fn pending_participants(&self) -> &[Arc<User>] {
169 &self.pending_participants
170 }
171
172 pub fn contains_participant(&self, user_id: u64) -> bool {
173 self.participant_user_ids.contains(&user_id)
174 }
175
176 async fn handle_room_updated(
177 this: ModelHandle<Self>,
178 envelope: TypedEnvelope<proto::RoomUpdated>,
179 _: Arc<Client>,
180 mut cx: AsyncAppContext,
181 ) -> Result<()> {
182 let room = envelope
183 .payload
184 .room
185 .ok_or_else(|| anyhow!("invalid room"))?;
186 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
187 }
188
189 fn apply_room_update(
190 &mut self,
191 mut room: proto::Room,
192 cx: &mut ModelContext<Self>,
193 ) -> Result<()> {
194 // Filter ourselves out from the room's participants.
195 room.participants
196 .retain(|participant| Some(participant.user_id) != self.client.user_id());
197
198 let remote_participant_user_ids = room
199 .participants
200 .iter()
201 .map(|p| p.user_id)
202 .collect::<Vec<_>>();
203 let (remote_participants, pending_participants) =
204 self.user_store.update(cx, move |user_store, cx| {
205 (
206 user_store.get_users(remote_participant_user_ids, cx),
207 user_store.get_users(room.pending_participant_user_ids, cx),
208 )
209 });
210 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
211 let (remote_participants, pending_participants) =
212 futures::join!(remote_participants, pending_participants);
213
214 this.update(&mut cx, |this, cx| {
215 this.participant_user_ids.clear();
216
217 if let Some(participants) = remote_participants.log_err() {
218 for (participant, user) in room.participants.into_iter().zip(participants) {
219 let peer_id = PeerId(participant.peer_id);
220 this.participant_user_ids.insert(participant.user_id);
221
222 let existing_project_ids = this
223 .remote_participants
224 .get(&peer_id)
225 .map(|existing| existing.project_ids.clone())
226 .unwrap_or_default();
227 for project_id in &participant.project_ids {
228 if !existing_project_ids.contains(project_id) {
229 cx.emit(Event::RemoteProjectShared {
230 owner: user.clone(),
231 project_id: *project_id,
232 });
233 }
234 }
235
236 this.remote_participants.insert(
237 peer_id,
238 RemoteParticipant {
239 user: user.clone(),
240 project_ids: participant.project_ids,
241 location: ParticipantLocation::from_proto(participant.location)
242 .unwrap_or(ParticipantLocation::External),
243 },
244 );
245 }
246
247 this.remote_participants.retain(|_, participant| {
248 this.participant_user_ids.contains(&participant.user.id)
249 });
250
251 cx.notify();
252 }
253
254 if let Some(pending_participants) = pending_participants.log_err() {
255 this.pending_participants = pending_participants;
256 for participant in &this.pending_participants {
257 this.participant_user_ids.insert(participant.id);
258 }
259 cx.notify();
260 }
261
262 this.pending_room_update.take();
263 if this.should_leave() {
264 let _ = this.leave(cx);
265 }
266
267 this.check_invariants();
268 });
269 }));
270
271 cx.notify();
272 Ok(())
273 }
274
275 fn check_invariants(&self) {
276 #[cfg(any(test, feature = "test-support"))]
277 {
278 for participant in self.remote_participants.values() {
279 assert!(self.participant_user_ids.contains(&participant.user.id));
280 }
281
282 for participant in &self.pending_participants {
283 assert!(self.participant_user_ids.contains(&participant.id));
284 }
285
286 assert_eq!(
287 self.participant_user_ids.len(),
288 self.remote_participants.len() + self.pending_participants.len()
289 );
290 }
291 }
292
293 pub(crate) fn call(
294 &mut self,
295 recipient_user_id: u64,
296 initial_project_id: Option<u64>,
297 cx: &mut ModelContext<Self>,
298 ) -> Task<Result<()>> {
299 if self.status.is_offline() {
300 return Task::ready(Err(anyhow!("room is offline")));
301 }
302
303 cx.notify();
304 let client = self.client.clone();
305 let room_id = self.id;
306 self.pending_call_count += 1;
307 cx.spawn(|this, mut cx| async move {
308 let result = client
309 .request(proto::Call {
310 room_id,
311 recipient_user_id,
312 initial_project_id,
313 })
314 .await;
315 this.update(&mut cx, |this, cx| {
316 this.pending_call_count -= 1;
317 if this.should_leave() {
318 this.leave(cx)?;
319 }
320 result
321 })?;
322 Ok(())
323 })
324 }
325
326 pub(crate) fn share_project(
327 &mut self,
328 project: ModelHandle<Project>,
329 cx: &mut ModelContext<Self>,
330 ) -> Task<Result<u64>> {
331 if project.read(cx).is_remote() {
332 return Task::ready(Err(anyhow!("can't share remote project")));
333 } else if let Some(project_id) = project.read(cx).remote_id() {
334 return Task::ready(Ok(project_id));
335 }
336
337 let request = self
338 .client
339 .request(proto::ShareProject { room_id: self.id() });
340 cx.spawn_weak(|_, mut cx| async move {
341 let response = request.await?;
342 project
343 .update(&mut cx, |project, cx| {
344 project.shared(response.project_id, cx)
345 })
346 .await?;
347 Ok(response.project_id)
348 })
349 }
350
351 pub fn set_location(
352 &mut self,
353 project: Option<&ModelHandle<Project>>,
354 cx: &mut ModelContext<Self>,
355 ) -> Task<Result<()>> {
356 if self.status.is_offline() {
357 return Task::ready(Err(anyhow!("room is offline")));
358 }
359
360 let client = self.client.clone();
361 let room_id = self.id;
362 let location = if let Some(project) = project {
363 if let Some(project_id) = project.read(cx).remote_id() {
364 proto::participant_location::Variant::Project(
365 proto::participant_location::Project { id: project_id },
366 )
367 } else {
368 return Task::ready(Err(anyhow!("project is not shared")));
369 }
370 } else {
371 proto::participant_location::Variant::External(proto::participant_location::External {})
372 };
373
374 cx.foreground().spawn(async move {
375 client
376 .request(proto::UpdateParticipantLocation {
377 room_id,
378 location: Some(proto::ParticipantLocation {
379 variant: Some(location),
380 }),
381 })
382 .await?;
383 Ok(())
384 })
385 }
386}
387
388#[derive(Copy, Clone, PartialEq, Eq)]
389pub enum RoomStatus {
390 Online,
391 Offline,
392}
393
394impl RoomStatus {
395 pub fn is_offline(&self) -> bool {
396 matches!(self, RoomStatus::Offline)
397 }
398}