1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use project::Project;
11use std::sync::Arc;
12use util::ResultExt;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum Event {
16 RemoteProjectShared {
17 owner: Arc<User>,
18 project_id: u64,
19 worktree_root_names: Vec<String>,
20 },
21 RemoteProjectUnshared {
22 project_id: u64,
23 },
24 Left,
25}
26
27pub struct Room {
28 id: u64,
29 status: RoomStatus,
30 local_participant: LocalParticipant,
31 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
32 pending_participants: Vec<Arc<User>>,
33 participant_user_ids: HashSet<u64>,
34 pending_call_count: usize,
35 leave_when_empty: bool,
36 client: Arc<Client>,
37 user_store: ModelHandle<UserStore>,
38 subscriptions: Vec<client::Subscription>,
39 pending_room_update: Option<Task<()>>,
40}
41
42impl Entity for Room {
43 type Event = Event;
44
45 fn release(&mut self, _: &mut MutableAppContext) {
46 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
47 }
48}
49
50impl Room {
51 fn new(
52 id: u64,
53 client: Arc<Client>,
54 user_store: ModelHandle<UserStore>,
55 cx: &mut ModelContext<Self>,
56 ) -> Self {
57 let mut client_status = client.status();
58 cx.spawn_weak(|this, mut cx| async move {
59 let is_connected = client_status
60 .next()
61 .await
62 .map_or(false, |s| s.is_connected());
63 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
64 if !is_connected || client_status.next().await.is_some() {
65 if let Some(this) = this.upgrade(&cx) {
66 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
67 }
68 }
69 })
70 .detach();
71
72 Self {
73 id,
74 status: RoomStatus::Online,
75 participant_user_ids: Default::default(),
76 local_participant: Default::default(),
77 remote_participants: Default::default(),
78 pending_participants: Default::default(),
79 pending_call_count: 0,
80 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
81 leave_when_empty: false,
82 pending_room_update: None,
83 client,
84 user_store,
85 }
86 }
87
88 pub(crate) fn create(
89 recipient_user_id: u64,
90 initial_project: Option<ModelHandle<Project>>,
91 client: Arc<Client>,
92 user_store: ModelHandle<UserStore>,
93 cx: &mut MutableAppContext,
94 ) -> Task<Result<ModelHandle<Self>>> {
95 cx.spawn(|mut cx| async move {
96 let response = client.request(proto::CreateRoom {}).await?;
97 let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
98
99 let initial_project_id = if let Some(initial_project) = initial_project {
100 let initial_project_id = room
101 .update(&mut cx, |room, cx| {
102 room.share_project(initial_project.clone(), cx)
103 })
104 .await?;
105 Some(initial_project_id)
106 } else {
107 None
108 };
109
110 match room
111 .update(&mut cx, |room, cx| {
112 room.leave_when_empty = true;
113 room.call(recipient_user_id, initial_project_id, cx)
114 })
115 .await
116 {
117 Ok(()) => Ok(room),
118 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
119 }
120 })
121 }
122
123 pub(crate) fn join(
124 call: &IncomingCall,
125 client: Arc<Client>,
126 user_store: ModelHandle<UserStore>,
127 cx: &mut MutableAppContext,
128 ) -> Task<Result<ModelHandle<Self>>> {
129 let room_id = call.room_id;
130 cx.spawn(|mut cx| async move {
131 let response = client.request(proto::JoinRoom { id: room_id }).await?;
132 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
133 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
134 room.update(&mut cx, |room, cx| {
135 room.leave_when_empty = true;
136 room.apply_room_update(room_proto, cx)?;
137 anyhow::Ok(())
138 })?;
139 Ok(room)
140 })
141 }
142
143 fn should_leave(&self) -> bool {
144 self.leave_when_empty
145 && self.pending_room_update.is_none()
146 && self.pending_participants.is_empty()
147 && self.remote_participants.is_empty()
148 && self.pending_call_count == 0
149 }
150
151 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
152 if self.status.is_offline() {
153 return Err(anyhow!("room is offline"));
154 }
155
156 cx.notify();
157 cx.emit(Event::Left);
158 self.status = RoomStatus::Offline;
159 self.remote_participants.clear();
160 self.pending_participants.clear();
161 self.participant_user_ids.clear();
162 self.subscriptions.clear();
163 self.client.send(proto::LeaveRoom { id: self.id })?;
164 Ok(())
165 }
166
167 pub fn id(&self) -> u64 {
168 self.id
169 }
170
171 pub fn status(&self) -> RoomStatus {
172 self.status
173 }
174
175 pub fn local_participant(&self) -> &LocalParticipant {
176 &self.local_participant
177 }
178
179 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
180 &self.remote_participants
181 }
182
183 pub fn pending_participants(&self) -> &[Arc<User>] {
184 &self.pending_participants
185 }
186
187 pub fn contains_participant(&self, user_id: u64) -> bool {
188 self.participant_user_ids.contains(&user_id)
189 }
190
191 async fn handle_room_updated(
192 this: ModelHandle<Self>,
193 envelope: TypedEnvelope<proto::RoomUpdated>,
194 _: Arc<Client>,
195 mut cx: AsyncAppContext,
196 ) -> Result<()> {
197 let room = envelope
198 .payload
199 .room
200 .ok_or_else(|| anyhow!("invalid room"))?;
201 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
202 }
203
204 fn apply_room_update(
205 &mut self,
206 mut room: proto::Room,
207 cx: &mut ModelContext<Self>,
208 ) -> Result<()> {
209 // Filter ourselves out from the room's participants.
210 let local_participant_ix = room
211 .participants
212 .iter()
213 .position(|participant| Some(participant.user_id) == self.client.user_id());
214 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
215
216 let remote_participant_user_ids = room
217 .participants
218 .iter()
219 .map(|p| p.user_id)
220 .collect::<Vec<_>>();
221 let (remote_participants, pending_participants) =
222 self.user_store.update(cx, move |user_store, cx| {
223 (
224 user_store.get_users(remote_participant_user_ids, cx),
225 user_store.get_users(room.pending_participant_user_ids, cx),
226 )
227 });
228 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
229 let (remote_participants, pending_participants) =
230 futures::join!(remote_participants, pending_participants);
231
232 this.update(&mut cx, |this, cx| {
233 this.participant_user_ids.clear();
234
235 if let Some(participant) = local_participant {
236 this.local_participant.projects = participant.projects;
237 } else {
238 this.local_participant.projects.clear();
239 }
240
241 if let Some(participants) = remote_participants.log_err() {
242 for (participant, user) in room.participants.into_iter().zip(participants) {
243 let peer_id = PeerId(participant.peer_id);
244 this.participant_user_ids.insert(participant.user_id);
245
246 let old_projects = this
247 .remote_participants
248 .get(&peer_id)
249 .into_iter()
250 .flat_map(|existing| &existing.projects)
251 .map(|project| project.id)
252 .collect::<HashSet<_>>();
253 let new_projects = participant
254 .projects
255 .iter()
256 .map(|project| project.id)
257 .collect::<HashSet<_>>();
258
259 for project in &participant.projects {
260 if !old_projects.contains(&project.id) {
261 cx.emit(Event::RemoteProjectShared {
262 owner: user.clone(),
263 project_id: project.id,
264 worktree_root_names: project.worktree_root_names.clone(),
265 });
266 }
267 }
268
269 for unshared_project_id in old_projects.difference(&new_projects) {
270 cx.emit(Event::RemoteProjectUnshared {
271 project_id: *unshared_project_id,
272 });
273 }
274
275 this.remote_participants.insert(
276 peer_id,
277 RemoteParticipant {
278 user: user.clone(),
279 projects: participant.projects,
280 location: ParticipantLocation::from_proto(participant.location)
281 .unwrap_or(ParticipantLocation::External),
282 },
283 );
284 }
285
286 this.remote_participants.retain(|_, participant| {
287 if this.participant_user_ids.contains(&participant.user.id) {
288 true
289 } else {
290 for project in &participant.projects {
291 cx.emit(Event::RemoteProjectUnshared {
292 project_id: project.id,
293 });
294 }
295 false
296 }
297 });
298 }
299
300 if let Some(pending_participants) = pending_participants.log_err() {
301 this.pending_participants = pending_participants;
302 for participant in &this.pending_participants {
303 this.participant_user_ids.insert(participant.id);
304 }
305 }
306
307 this.pending_room_update.take();
308 if this.should_leave() {
309 let _ = this.leave(cx);
310 }
311
312 this.check_invariants();
313 cx.notify();
314 });
315 }));
316
317 cx.notify();
318 Ok(())
319 }
320
321 fn check_invariants(&self) {
322 #[cfg(any(test, feature = "test-support"))]
323 {
324 for participant in self.remote_participants.values() {
325 assert!(self.participant_user_ids.contains(&participant.user.id));
326 }
327
328 for participant in &self.pending_participants {
329 assert!(self.participant_user_ids.contains(&participant.id));
330 }
331
332 assert_eq!(
333 self.participant_user_ids.len(),
334 self.remote_participants.len() + self.pending_participants.len()
335 );
336 }
337 }
338
339 pub(crate) fn call(
340 &mut self,
341 recipient_user_id: u64,
342 initial_project_id: Option<u64>,
343 cx: &mut ModelContext<Self>,
344 ) -> Task<Result<()>> {
345 if self.status.is_offline() {
346 return Task::ready(Err(anyhow!("room is offline")));
347 }
348
349 cx.notify();
350 let client = self.client.clone();
351 let room_id = self.id;
352 self.pending_call_count += 1;
353 cx.spawn(|this, mut cx| async move {
354 let result = client
355 .request(proto::Call {
356 room_id,
357 recipient_user_id,
358 initial_project_id,
359 })
360 .await;
361 this.update(&mut cx, |this, cx| {
362 this.pending_call_count -= 1;
363 if this.should_leave() {
364 this.leave(cx)?;
365 }
366 result
367 })?;
368 Ok(())
369 })
370 }
371
372 pub(crate) fn share_project(
373 &mut self,
374 project: ModelHandle<Project>,
375 cx: &mut ModelContext<Self>,
376 ) -> Task<Result<u64>> {
377 if let Some(project_id) = project.read(cx).remote_id() {
378 return Task::ready(Ok(project_id));
379 }
380
381 let request = self.client.request(proto::ShareProject {
382 room_id: self.id(),
383 worktrees: project
384 .read(cx)
385 .worktrees(cx)
386 .map(|worktree| {
387 let worktree = worktree.read(cx);
388 proto::WorktreeMetadata {
389 id: worktree.id().to_proto(),
390 root_name: worktree.root_name().into(),
391 visible: worktree.is_visible(),
392 }
393 })
394 .collect(),
395 });
396 cx.spawn(|this, mut cx| async move {
397 let response = request.await?;
398
399 project.update(&mut cx, |project, cx| {
400 project
401 .shared(response.project_id, cx)
402 .detach_and_log_err(cx)
403 });
404
405 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
406 this.update(&mut cx, |this, cx| {
407 let active_project = this.local_participant.active_project.as_ref();
408 if active_project.map_or(false, |location| *location == project) {
409 this.set_location(Some(&project), cx)
410 } else {
411 Task::ready(Ok(()))
412 }
413 })
414 .await?;
415
416 Ok(response.project_id)
417 })
418 }
419
420 pub fn set_location(
421 &mut self,
422 project: Option<&ModelHandle<Project>>,
423 cx: &mut ModelContext<Self>,
424 ) -> Task<Result<()>> {
425 if self.status.is_offline() {
426 return Task::ready(Err(anyhow!("room is offline")));
427 }
428
429 let client = self.client.clone();
430 let room_id = self.id;
431 let location = if let Some(project) = project {
432 self.local_participant.active_project = Some(project.downgrade());
433 if let Some(project_id) = project.read(cx).remote_id() {
434 proto::participant_location::Variant::SharedProject(
435 proto::participant_location::SharedProject { id: project_id },
436 )
437 } else {
438 proto::participant_location::Variant::UnsharedProject(
439 proto::participant_location::UnsharedProject {},
440 )
441 }
442 } else {
443 self.local_participant.active_project = None;
444 proto::participant_location::Variant::External(proto::participant_location::External {})
445 };
446
447 cx.notify();
448 cx.foreground().spawn(async move {
449 client
450 .request(proto::UpdateParticipantLocation {
451 room_id,
452 location: Some(proto::ParticipantLocation {
453 variant: Some(location),
454 }),
455 })
456 .await?;
457 Ok(())
458 })
459 }
460}
461
462#[derive(Copy, Clone, PartialEq, Eq)]
463pub enum RoomStatus {
464 Online,
465 Offline,
466}
467
468impl RoomStatus {
469 pub fn is_offline(&self) -> bool {
470 matches!(self, RoomStatus::Offline)
471 }
472}