room.rs

  1use crate::{
  2    participant::{ParticipantLocation, RemoteParticipant},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use project::Project;
 11use std::sync::Arc;
 12use util::ResultExt;
 13
 14#[derive(Clone, Debug, PartialEq, Eq)]
 15pub enum Event {
 16    RemoteProjectShared {
 17        owner: Arc<User>,
 18        project_id: u64,
 19        worktree_root_names: Vec<String>,
 20    },
 21}
 22
 23pub struct Room {
 24    id: u64,
 25    status: RoomStatus,
 26    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 27    pending_participants: Vec<Arc<User>>,
 28    participant_user_ids: HashSet<u64>,
 29    pending_call_count: usize,
 30    leave_when_empty: bool,
 31    client: Arc<Client>,
 32    user_store: ModelHandle<UserStore>,
 33    subscriptions: Vec<client::Subscription>,
 34    pending_room_update: Option<Task<()>>,
 35}
 36
 37impl Entity for Room {
 38    type Event = Event;
 39
 40    fn release(&mut self, _: &mut MutableAppContext) {
 41        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 42    }
 43}
 44
 45impl Room {
 46    fn new(
 47        id: u64,
 48        client: Arc<Client>,
 49        user_store: ModelHandle<UserStore>,
 50        cx: &mut ModelContext<Self>,
 51    ) -> Self {
 52        let mut client_status = client.status();
 53        cx.spawn_weak(|this, mut cx| async move {
 54            let is_connected = client_status
 55                .next()
 56                .await
 57                .map_or(false, |s| s.is_connected());
 58            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 59            if !is_connected || client_status.next().await.is_some() {
 60                if let Some(this) = this.upgrade(&cx) {
 61                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 62                }
 63            }
 64        })
 65        .detach();
 66
 67        Self {
 68            id,
 69            status: RoomStatus::Online,
 70            participant_user_ids: Default::default(),
 71            remote_participants: Default::default(),
 72            pending_participants: Default::default(),
 73            pending_call_count: 0,
 74            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 75            leave_when_empty: false,
 76            pending_room_update: None,
 77            client,
 78            user_store,
 79        }
 80    }
 81
 82    pub(crate) fn create(
 83        recipient_user_id: u64,
 84        initial_project: Option<ModelHandle<Project>>,
 85        client: Arc<Client>,
 86        user_store: ModelHandle<UserStore>,
 87        cx: &mut MutableAppContext,
 88    ) -> Task<Result<ModelHandle<Self>>> {
 89        cx.spawn(|mut cx| async move {
 90            let response = client.request(proto::CreateRoom {}).await?;
 91            let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
 92
 93            let initial_project_id = if let Some(initial_project) = initial_project {
 94                let initial_project_id = room
 95                    .update(&mut cx, |room, cx| {
 96                        room.share_project(initial_project.clone(), cx)
 97                    })
 98                    .await?;
 99                Some(initial_project_id)
100            } else {
101                None
102            };
103
104            match room
105                .update(&mut cx, |room, cx| {
106                    room.leave_when_empty = true;
107                    room.call(recipient_user_id, initial_project_id, cx)
108                })
109                .await
110            {
111                Ok(()) => Ok(room),
112                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
113            }
114        })
115    }
116
117    pub(crate) fn join(
118        call: &IncomingCall,
119        client: Arc<Client>,
120        user_store: ModelHandle<UserStore>,
121        cx: &mut MutableAppContext,
122    ) -> Task<Result<ModelHandle<Self>>> {
123        let room_id = call.room_id;
124        cx.spawn(|mut cx| async move {
125            let response = client.request(proto::JoinRoom { id: room_id }).await?;
126            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
127            let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
128            room.update(&mut cx, |room, cx| {
129                room.leave_when_empty = true;
130                room.apply_room_update(room_proto, cx)?;
131                anyhow::Ok(())
132            })?;
133            Ok(room)
134        })
135    }
136
137    fn should_leave(&self) -> bool {
138        self.leave_when_empty
139            && self.pending_room_update.is_none()
140            && self.pending_participants.is_empty()
141            && self.remote_participants.is_empty()
142            && self.pending_call_count == 0
143    }
144
145    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
146        if self.status.is_offline() {
147            return Err(anyhow!("room is offline"));
148        }
149
150        cx.notify();
151        self.status = RoomStatus::Offline;
152        self.remote_participants.clear();
153        self.pending_participants.clear();
154        self.participant_user_ids.clear();
155        self.subscriptions.clear();
156        self.client.send(proto::LeaveRoom { id: self.id })?;
157        Ok(())
158    }
159
160    pub fn id(&self) -> u64 {
161        self.id
162    }
163
164    pub fn status(&self) -> RoomStatus {
165        self.status
166    }
167
168    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
169        &self.remote_participants
170    }
171
172    pub fn pending_participants(&self) -> &[Arc<User>] {
173        &self.pending_participants
174    }
175
176    pub fn contains_participant(&self, user_id: u64) -> bool {
177        self.participant_user_ids.contains(&user_id)
178    }
179
180    async fn handle_room_updated(
181        this: ModelHandle<Self>,
182        envelope: TypedEnvelope<proto::RoomUpdated>,
183        _: Arc<Client>,
184        mut cx: AsyncAppContext,
185    ) -> Result<()> {
186        let room = envelope
187            .payload
188            .room
189            .ok_or_else(|| anyhow!("invalid room"))?;
190        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
191    }
192
193    fn apply_room_update(
194        &mut self,
195        mut room: proto::Room,
196        cx: &mut ModelContext<Self>,
197    ) -> Result<()> {
198        // Filter ourselves out from the room's participants.
199        room.participants
200            .retain(|participant| Some(participant.user_id) != self.client.user_id());
201
202        let remote_participant_user_ids = room
203            .participants
204            .iter()
205            .map(|p| p.user_id)
206            .collect::<Vec<_>>();
207        let (remote_participants, pending_participants) =
208            self.user_store.update(cx, move |user_store, cx| {
209                (
210                    user_store.get_users(remote_participant_user_ids, cx),
211                    user_store.get_users(room.pending_participant_user_ids, cx),
212                )
213            });
214        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
215            let (remote_participants, pending_participants) =
216                futures::join!(remote_participants, pending_participants);
217
218            this.update(&mut cx, |this, cx| {
219                this.participant_user_ids.clear();
220
221                if let Some(participants) = remote_participants.log_err() {
222                    for (participant, user) in room.participants.into_iter().zip(participants) {
223                        let peer_id = PeerId(participant.peer_id);
224                        this.participant_user_ids.insert(participant.user_id);
225
226                        let existing_projects = this
227                            .remote_participants
228                            .get(&peer_id)
229                            .into_iter()
230                            .flat_map(|existing| &existing.projects)
231                            .map(|project| project.id)
232                            .collect::<HashSet<_>>();
233                        for project in &participant.projects {
234                            if !existing_projects.contains(&project.id) {
235                                cx.emit(Event::RemoteProjectShared {
236                                    owner: user.clone(),
237                                    project_id: project.id,
238                                    worktree_root_names: project.worktree_root_names.clone(),
239                                });
240                            }
241                        }
242
243                        this.remote_participants.insert(
244                            peer_id,
245                            RemoteParticipant {
246                                user: user.clone(),
247                                projects: participant.projects,
248                                location: ParticipantLocation::from_proto(participant.location)
249                                    .unwrap_or(ParticipantLocation::External),
250                            },
251                        );
252                    }
253
254                    this.remote_participants.retain(|_, participant| {
255                        this.participant_user_ids.contains(&participant.user.id)
256                    });
257
258                    cx.notify();
259                }
260
261                if let Some(pending_participants) = pending_participants.log_err() {
262                    this.pending_participants = pending_participants;
263                    for participant in &this.pending_participants {
264                        this.participant_user_ids.insert(participant.id);
265                    }
266                    cx.notify();
267                }
268
269                this.pending_room_update.take();
270                if this.should_leave() {
271                    let _ = this.leave(cx);
272                }
273
274                this.check_invariants();
275            });
276        }));
277
278        cx.notify();
279        Ok(())
280    }
281
282    fn check_invariants(&self) {
283        #[cfg(any(test, feature = "test-support"))]
284        {
285            for participant in self.remote_participants.values() {
286                assert!(self.participant_user_ids.contains(&participant.user.id));
287            }
288
289            for participant in &self.pending_participants {
290                assert!(self.participant_user_ids.contains(&participant.id));
291            }
292
293            assert_eq!(
294                self.participant_user_ids.len(),
295                self.remote_participants.len() + self.pending_participants.len()
296            );
297        }
298    }
299
300    pub(crate) fn call(
301        &mut self,
302        recipient_user_id: u64,
303        initial_project_id: Option<u64>,
304        cx: &mut ModelContext<Self>,
305    ) -> Task<Result<()>> {
306        if self.status.is_offline() {
307            return Task::ready(Err(anyhow!("room is offline")));
308        }
309
310        cx.notify();
311        let client = self.client.clone();
312        let room_id = self.id;
313        self.pending_call_count += 1;
314        cx.spawn(|this, mut cx| async move {
315            let result = client
316                .request(proto::Call {
317                    room_id,
318                    recipient_user_id,
319                    initial_project_id,
320                })
321                .await;
322            this.update(&mut cx, |this, cx| {
323                this.pending_call_count -= 1;
324                if this.should_leave() {
325                    this.leave(cx)?;
326                }
327                result
328            })?;
329            Ok(())
330        })
331    }
332
333    pub(crate) fn share_project(
334        &mut self,
335        project: ModelHandle<Project>,
336        cx: &mut ModelContext<Self>,
337    ) -> Task<Result<u64>> {
338        if project.read(cx).is_remote() {
339            return Task::ready(Err(anyhow!("can't share remote project")));
340        } else if let Some(project_id) = project.read(cx).remote_id() {
341            return Task::ready(Ok(project_id));
342        }
343
344        let request = self.client.request(proto::ShareProject {
345            room_id: self.id(),
346            worktrees: project
347                .read(cx)
348                .worktrees(cx)
349                .map(|worktree| {
350                    let worktree = worktree.read(cx);
351                    proto::WorktreeMetadata {
352                        id: worktree.id().to_proto(),
353                        root_name: worktree.root_name().into(),
354                        visible: worktree.is_visible(),
355                    }
356                })
357                .collect(),
358        });
359        cx.spawn_weak(|_, mut cx| async move {
360            let response = request.await?;
361            project
362                .update(&mut cx, |project, cx| {
363                    project.shared(response.project_id, cx)
364                })
365                .await?;
366            Ok(response.project_id)
367        })
368    }
369
370    pub fn set_location(
371        &mut self,
372        project: Option<&ModelHandle<Project>>,
373        cx: &mut ModelContext<Self>,
374    ) -> Task<Result<()>> {
375        if self.status.is_offline() {
376            return Task::ready(Err(anyhow!("room is offline")));
377        }
378
379        let client = self.client.clone();
380        let room_id = self.id;
381        let location = if let Some(project) = project {
382            if let Some(project_id) = project.read(cx).remote_id() {
383                proto::participant_location::Variant::Project(
384                    proto::participant_location::Project { id: project_id },
385                )
386            } else {
387                return Task::ready(Err(anyhow!("project is not shared")));
388            }
389        } else {
390            proto::participant_location::Variant::External(proto::participant_location::External {})
391        };
392
393        cx.foreground().spawn(async move {
394            client
395                .request(proto::UpdateParticipantLocation {
396                    room_id,
397                    location: Some(proto::ParticipantLocation {
398                        variant: Some(location),
399                    }),
400                })
401                .await?;
402            Ok(())
403        })
404    }
405}
406
407#[derive(Copy, Clone, PartialEq, Eq)]
408pub enum RoomStatus {
409    Online,
410    Offline,
411}
412
413impl RoomStatus {
414    pub fn is_offline(&self) -> bool {
415        matches!(self, RoomStatus::Offline)
416    }
417}