room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use project::Project;
 11use std::sync::Arc;
 12use util::ResultExt;
 13
 14#[derive(Clone, Debug, PartialEq, Eq)]
 15pub enum Event {
 16    RemoteProjectShared {
 17        owner: Arc<User>,
 18        project_id: u64,
 19        worktree_root_names: Vec<String>,
 20    },
 21    RemoteProjectUnshared {
 22        project_id: u64,
 23    },
 24    Left,
 25}
 26
 27pub struct Room {
 28    id: u64,
 29    status: RoomStatus,
 30    local_participant: LocalParticipant,
 31    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 32    pending_participants: Vec<Arc<User>>,
 33    participant_user_ids: HashSet<u64>,
 34    pending_call_count: usize,
 35    leave_when_empty: bool,
 36    client: Arc<Client>,
 37    user_store: ModelHandle<UserStore>,
 38    subscriptions: Vec<client::Subscription>,
 39    pending_room_update: Option<Task<()>>,
 40}
 41
 42impl Entity for Room {
 43    type Event = Event;
 44
 45    fn release(&mut self, _: &mut MutableAppContext) {
 46        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 47    }
 48}
 49
 50impl Room {
 51    fn new(
 52        id: u64,
 53        client: Arc<Client>,
 54        user_store: ModelHandle<UserStore>,
 55        cx: &mut ModelContext<Self>,
 56    ) -> Self {
 57        let mut client_status = client.status();
 58        cx.spawn_weak(|this, mut cx| async move {
 59            let is_connected = client_status
 60                .next()
 61                .await
 62                .map_or(false, |s| s.is_connected());
 63            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 64            if !is_connected || client_status.next().await.is_some() {
 65                if let Some(this) = this.upgrade(&cx) {
 66                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 67                }
 68            }
 69        })
 70        .detach();
 71
 72        Self {
 73            id,
 74            status: RoomStatus::Online,
 75            participant_user_ids: Default::default(),
 76            local_participant: Default::default(),
 77            remote_participants: Default::default(),
 78            pending_participants: Default::default(),
 79            pending_call_count: 0,
 80            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 81            leave_when_empty: false,
 82            pending_room_update: None,
 83            client,
 84            user_store,
 85        }
 86    }
 87
 88    pub(crate) fn create(
 89        recipient_user_id: u64,
 90        initial_project: Option<ModelHandle<Project>>,
 91        client: Arc<Client>,
 92        user_store: ModelHandle<UserStore>,
 93        cx: &mut MutableAppContext,
 94    ) -> Task<Result<ModelHandle<Self>>> {
 95        cx.spawn(|mut cx| async move {
 96            let response = client.request(proto::CreateRoom {}).await?;
 97            let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
 98
 99            let initial_project_id = if let Some(initial_project) = initial_project {
100                let initial_project_id = room
101                    .update(&mut cx, |room, cx| {
102                        room.share_project(initial_project.clone(), cx)
103                    })
104                    .await?;
105                Some(initial_project_id)
106            } else {
107                None
108            };
109
110            match room
111                .update(&mut cx, |room, cx| {
112                    room.leave_when_empty = true;
113                    room.call(recipient_user_id, initial_project_id, cx)
114                })
115                .await
116            {
117                Ok(()) => Ok(room),
118                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
119            }
120        })
121    }
122
123    pub(crate) fn join(
124        call: &IncomingCall,
125        client: Arc<Client>,
126        user_store: ModelHandle<UserStore>,
127        cx: &mut MutableAppContext,
128    ) -> Task<Result<ModelHandle<Self>>> {
129        let room_id = call.room_id;
130        cx.spawn(|mut cx| async move {
131            let response = client.request(proto::JoinRoom { id: room_id }).await?;
132            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
133            let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
134            room.update(&mut cx, |room, cx| {
135                room.leave_when_empty = true;
136                room.apply_room_update(room_proto, cx)?;
137                anyhow::Ok(())
138            })?;
139            Ok(room)
140        })
141    }
142
143    fn should_leave(&self) -> bool {
144        self.leave_when_empty
145            && self.pending_room_update.is_none()
146            && self.pending_participants.is_empty()
147            && self.remote_participants.is_empty()
148            && self.pending_call_count == 0
149    }
150
151    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
152        if self.status.is_offline() {
153            return Err(anyhow!("room is offline"));
154        }
155
156        cx.notify();
157        cx.emit(Event::Left);
158        self.status = RoomStatus::Offline;
159        self.remote_participants.clear();
160        self.pending_participants.clear();
161        self.participant_user_ids.clear();
162        self.subscriptions.clear();
163        self.client.send(proto::LeaveRoom { id: self.id })?;
164        Ok(())
165    }
166
167    pub fn id(&self) -> u64 {
168        self.id
169    }
170
171    pub fn status(&self) -> RoomStatus {
172        self.status
173    }
174
175    pub fn local_participant(&self) -> &LocalParticipant {
176        &self.local_participant
177    }
178
179    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
180        &self.remote_participants
181    }
182
183    pub fn pending_participants(&self) -> &[Arc<User>] {
184        &self.pending_participants
185    }
186
187    pub fn contains_participant(&self, user_id: u64) -> bool {
188        self.participant_user_ids.contains(&user_id)
189    }
190
191    async fn handle_room_updated(
192        this: ModelHandle<Self>,
193        envelope: TypedEnvelope<proto::RoomUpdated>,
194        _: Arc<Client>,
195        mut cx: AsyncAppContext,
196    ) -> Result<()> {
197        let room = envelope
198            .payload
199            .room
200            .ok_or_else(|| anyhow!("invalid room"))?;
201        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
202    }
203
204    fn apply_room_update(
205        &mut self,
206        mut room: proto::Room,
207        cx: &mut ModelContext<Self>,
208    ) -> Result<()> {
209        // Filter ourselves out from the room's participants.
210        let local_participant_ix = room
211            .participants
212            .iter()
213            .position(|participant| Some(participant.user_id) == self.client.user_id());
214        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
215
216        let remote_participant_user_ids = room
217            .participants
218            .iter()
219            .map(|p| p.user_id)
220            .collect::<Vec<_>>();
221        let (remote_participants, pending_participants) =
222            self.user_store.update(cx, move |user_store, cx| {
223                (
224                    user_store.get_users(remote_participant_user_ids, cx),
225                    user_store.get_users(room.pending_participant_user_ids, cx),
226                )
227            });
228        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
229            let (remote_participants, pending_participants) =
230                futures::join!(remote_participants, pending_participants);
231
232            this.update(&mut cx, |this, cx| {
233                this.participant_user_ids.clear();
234
235                if let Some(participant) = local_participant {
236                    this.local_participant.projects = participant.projects;
237                } else {
238                    this.local_participant.projects.clear();
239                }
240
241                if let Some(participants) = remote_participants.log_err() {
242                    for (participant, user) in room.participants.into_iter().zip(participants) {
243                        let peer_id = PeerId(participant.peer_id);
244                        this.participant_user_ids.insert(participant.user_id);
245
246                        let old_projects = this
247                            .remote_participants
248                            .get(&peer_id)
249                            .into_iter()
250                            .flat_map(|existing| &existing.projects)
251                            .map(|project| project.id)
252                            .collect::<HashSet<_>>();
253                        let new_projects = participant
254                            .projects
255                            .iter()
256                            .map(|project| project.id)
257                            .collect::<HashSet<_>>();
258
259                        for project in &participant.projects {
260                            if !old_projects.contains(&project.id) {
261                                cx.emit(Event::RemoteProjectShared {
262                                    owner: user.clone(),
263                                    project_id: project.id,
264                                    worktree_root_names: project.worktree_root_names.clone(),
265                                });
266                            }
267                        }
268
269                        for unshared_project_id in old_projects.difference(&new_projects) {
270                            cx.emit(Event::RemoteProjectUnshared {
271                                project_id: *unshared_project_id,
272                            });
273                        }
274
275                        this.remote_participants.insert(
276                            peer_id,
277                            RemoteParticipant {
278                                user: user.clone(),
279                                projects: participant.projects,
280                                location: ParticipantLocation::from_proto(participant.location)
281                                    .unwrap_or(ParticipantLocation::External),
282                            },
283                        );
284                    }
285
286                    this.remote_participants.retain(|_, participant| {
287                        if this.participant_user_ids.contains(&participant.user.id) {
288                            true
289                        } else {
290                            for project in &participant.projects {
291                                cx.emit(Event::RemoteProjectUnshared {
292                                    project_id: project.id,
293                                });
294                            }
295                            false
296                        }
297                    });
298                }
299
300                if let Some(pending_participants) = pending_participants.log_err() {
301                    this.pending_participants = pending_participants;
302                    for participant in &this.pending_participants {
303                        this.participant_user_ids.insert(participant.id);
304                    }
305                }
306
307                this.pending_room_update.take();
308                if this.should_leave() {
309                    let _ = this.leave(cx);
310                }
311
312                this.check_invariants();
313                cx.notify();
314            });
315        }));
316
317        cx.notify();
318        Ok(())
319    }
320
321    fn check_invariants(&self) {
322        #[cfg(any(test, feature = "test-support"))]
323        {
324            for participant in self.remote_participants.values() {
325                assert!(self.participant_user_ids.contains(&participant.user.id));
326            }
327
328            for participant in &self.pending_participants {
329                assert!(self.participant_user_ids.contains(&participant.id));
330            }
331
332            assert_eq!(
333                self.participant_user_ids.len(),
334                self.remote_participants.len() + self.pending_participants.len()
335            );
336        }
337    }
338
339    pub(crate) fn call(
340        &mut self,
341        recipient_user_id: u64,
342        initial_project_id: Option<u64>,
343        cx: &mut ModelContext<Self>,
344    ) -> Task<Result<()>> {
345        if self.status.is_offline() {
346            return Task::ready(Err(anyhow!("room is offline")));
347        }
348
349        cx.notify();
350        let client = self.client.clone();
351        let room_id = self.id;
352        self.pending_call_count += 1;
353        cx.spawn(|this, mut cx| async move {
354            let result = client
355                .request(proto::Call {
356                    room_id,
357                    recipient_user_id,
358                    initial_project_id,
359                })
360                .await;
361            this.update(&mut cx, |this, cx| {
362                this.pending_call_count -= 1;
363                if this.should_leave() {
364                    this.leave(cx)?;
365                }
366                result
367            })?;
368            Ok(())
369        })
370    }
371
372    pub(crate) fn share_project(
373        &mut self,
374        project: ModelHandle<Project>,
375        cx: &mut ModelContext<Self>,
376    ) -> Task<Result<u64>> {
377        if let Some(project_id) = project.read(cx).remote_id() {
378            return Task::ready(Ok(project_id));
379        }
380
381        let request = self.client.request(proto::ShareProject {
382            room_id: self.id(),
383            worktrees: project
384                .read(cx)
385                .worktrees(cx)
386                .map(|worktree| {
387                    let worktree = worktree.read(cx);
388                    proto::WorktreeMetadata {
389                        id: worktree.id().to_proto(),
390                        root_name: worktree.root_name().into(),
391                        visible: worktree.is_visible(),
392                    }
393                })
394                .collect(),
395        });
396        cx.spawn(|this, mut cx| async move {
397            let response = request.await?;
398
399            project.update(&mut cx, |project, cx| {
400                project
401                    .shared(response.project_id, cx)
402                    .detach_and_log_err(cx)
403            });
404
405            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
406            this.update(&mut cx, |this, cx| {
407                let active_project = this.local_participant.active_project.as_ref();
408                if active_project.map_or(false, |location| *location == project) {
409                    this.set_location(Some(&project), cx)
410                } else {
411                    Task::ready(Ok(()))
412                }
413            })
414            .await?;
415
416            Ok(response.project_id)
417        })
418    }
419
420    pub fn set_location(
421        &mut self,
422        project: Option<&ModelHandle<Project>>,
423        cx: &mut ModelContext<Self>,
424    ) -> Task<Result<()>> {
425        if self.status.is_offline() {
426            return Task::ready(Err(anyhow!("room is offline")));
427        }
428
429        let client = self.client.clone();
430        let room_id = self.id;
431        let location = if let Some(project) = project {
432            self.local_participant.active_project = Some(project.downgrade());
433            if let Some(project_id) = project.read(cx).remote_id() {
434                proto::participant_location::Variant::SharedProject(
435                    proto::participant_location::SharedProject { id: project_id },
436                )
437            } else {
438                proto::participant_location::Variant::UnsharedProject(
439                    proto::participant_location::UnsharedProject {},
440                )
441            }
442        } else {
443            self.local_participant.active_project = None;
444            proto::participant_location::Variant::External(proto::participant_location::External {})
445        };
446
447        cx.notify();
448        cx.foreground().spawn(async move {
449            client
450                .request(proto::UpdateParticipantLocation {
451                    room_id,
452                    location: Some(proto::ParticipantLocation {
453                        variant: Some(location),
454                    }),
455                })
456                .await?;
457            Ok(())
458        })
459    }
460}
461
462#[derive(Copy, Clone, PartialEq, Eq)]
463pub enum RoomStatus {
464    Online,
465    Offline,
466}
467
468impl RoomStatus {
469    pub fn is_offline(&self) -> bool {
470        matches!(self, RoomStatus::Offline)
471    }
472}