room.rs

  1use crate::{
  2    participant::{ParticipantLocation, RemoteParticipant},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use project::Project;
 11use std::sync::Arc;
 12use util::ResultExt;
 13
 14#[derive(Clone, Debug, PartialEq, Eq)]
 15pub enum Event {
 16    RemoteProjectShared { owner: Arc<User>, project_id: u64 },
 17}
 18
 19pub struct Room {
 20    id: u64,
 21    status: RoomStatus,
 22    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 23    pending_participants: Vec<Arc<User>>,
 24    participant_user_ids: HashSet<u64>,
 25    pending_call_count: usize,
 26    leave_when_empty: bool,
 27    client: Arc<Client>,
 28    user_store: ModelHandle<UserStore>,
 29    subscriptions: Vec<client::Subscription>,
 30    pending_room_update: Option<Task<()>>,
 31}
 32
 33impl Entity for Room {
 34    type Event = Event;
 35
 36    fn release(&mut self, _: &mut MutableAppContext) {
 37        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 38    }
 39}
 40
 41impl Room {
 42    fn new(
 43        id: u64,
 44        client: Arc<Client>,
 45        user_store: ModelHandle<UserStore>,
 46        cx: &mut ModelContext<Self>,
 47    ) -> Self {
 48        let mut client_status = client.status();
 49        cx.spawn_weak(|this, mut cx| async move {
 50            let is_connected = client_status
 51                .next()
 52                .await
 53                .map_or(false, |s| s.is_connected());
 54            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 55            if !is_connected || client_status.next().await.is_some() {
 56                if let Some(this) = this.upgrade(&cx) {
 57                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 58                }
 59            }
 60        })
 61        .detach();
 62
 63        Self {
 64            id,
 65            status: RoomStatus::Online,
 66            participant_user_ids: Default::default(),
 67            remote_participants: Default::default(),
 68            pending_participants: Default::default(),
 69            pending_call_count: 0,
 70            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 71            leave_when_empty: false,
 72            pending_room_update: None,
 73            client,
 74            user_store,
 75        }
 76    }
 77
 78    pub(crate) fn create(
 79        recipient_user_id: u64,
 80        initial_project: Option<ModelHandle<Project>>,
 81        client: Arc<Client>,
 82        user_store: ModelHandle<UserStore>,
 83        cx: &mut MutableAppContext,
 84    ) -> Task<Result<ModelHandle<Self>>> {
 85        cx.spawn(|mut cx| async move {
 86            let response = client.request(proto::CreateRoom {}).await?;
 87            let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
 88
 89            let initial_project_id = if let Some(initial_project) = initial_project {
 90                let initial_project_id = room
 91                    .update(&mut cx, |room, cx| {
 92                        room.share_project(initial_project.clone(), cx)
 93                    })
 94                    .await?;
 95                Some(initial_project_id)
 96            } else {
 97                None
 98            };
 99
100            match room
101                .update(&mut cx, |room, cx| {
102                    room.leave_when_empty = true;
103                    room.call(recipient_user_id, initial_project_id, cx)
104                })
105                .await
106            {
107                Ok(()) => Ok(room),
108                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
109            }
110        })
111    }
112
113    pub(crate) fn join(
114        call: &IncomingCall,
115        client: Arc<Client>,
116        user_store: ModelHandle<UserStore>,
117        cx: &mut MutableAppContext,
118    ) -> Task<Result<ModelHandle<Self>>> {
119        let room_id = call.room_id;
120        cx.spawn(|mut cx| async move {
121            let response = client.request(proto::JoinRoom { id: room_id }).await?;
122            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
123            let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
124            room.update(&mut cx, |room, cx| {
125                room.leave_when_empty = true;
126                room.apply_room_update(room_proto, cx)?;
127                anyhow::Ok(())
128            })?;
129            Ok(room)
130        })
131    }
132
133    fn should_leave(&self) -> bool {
134        self.leave_when_empty
135            && self.pending_room_update.is_none()
136            && self.pending_participants.is_empty()
137            && self.remote_participants.is_empty()
138            && self.pending_call_count == 0
139    }
140
141    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
142        if self.status.is_offline() {
143            return Err(anyhow!("room is offline"));
144        }
145
146        cx.notify();
147        self.status = RoomStatus::Offline;
148        self.remote_participants.clear();
149        self.pending_participants.clear();
150        self.participant_user_ids.clear();
151        self.subscriptions.clear();
152        self.client.send(proto::LeaveRoom { id: self.id })?;
153        Ok(())
154    }
155
156    pub fn id(&self) -> u64 {
157        self.id
158    }
159
160    pub fn status(&self) -> RoomStatus {
161        self.status
162    }
163
164    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
165        &self.remote_participants
166    }
167
168    pub fn pending_participants(&self) -> &[Arc<User>] {
169        &self.pending_participants
170    }
171
172    pub fn contains_participant(&self, user_id: u64) -> bool {
173        self.participant_user_ids.contains(&user_id)
174    }
175
176    async fn handle_room_updated(
177        this: ModelHandle<Self>,
178        envelope: TypedEnvelope<proto::RoomUpdated>,
179        _: Arc<Client>,
180        mut cx: AsyncAppContext,
181    ) -> Result<()> {
182        let room = envelope
183            .payload
184            .room
185            .ok_or_else(|| anyhow!("invalid room"))?;
186        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
187    }
188
189    fn apply_room_update(
190        &mut self,
191        mut room: proto::Room,
192        cx: &mut ModelContext<Self>,
193    ) -> Result<()> {
194        // Filter ourselves out from the room's participants.
195        room.participants
196            .retain(|participant| Some(participant.user_id) != self.client.user_id());
197
198        let remote_participant_user_ids = room
199            .participants
200            .iter()
201            .map(|p| p.user_id)
202            .collect::<Vec<_>>();
203        let (remote_participants, pending_participants) =
204            self.user_store.update(cx, move |user_store, cx| {
205                (
206                    user_store.get_users(remote_participant_user_ids, cx),
207                    user_store.get_users(room.pending_participant_user_ids, cx),
208                )
209            });
210        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
211            let (remote_participants, pending_participants) =
212                futures::join!(remote_participants, pending_participants);
213
214            this.update(&mut cx, |this, cx| {
215                this.participant_user_ids.clear();
216
217                if let Some(participants) = remote_participants.log_err() {
218                    for (participant, user) in room.participants.into_iter().zip(participants) {
219                        let peer_id = PeerId(participant.peer_id);
220                        this.participant_user_ids.insert(participant.user_id);
221
222                        let existing_project_ids = this
223                            .remote_participants
224                            .get(&peer_id)
225                            .map(|existing| existing.project_ids.clone())
226                            .unwrap_or_default();
227                        for project_id in &participant.project_ids {
228                            if !existing_project_ids.contains(project_id) {
229                                cx.emit(Event::RemoteProjectShared {
230                                    owner: user.clone(),
231                                    project_id: *project_id,
232                                });
233                            }
234                        }
235
236                        this.remote_participants.insert(
237                            peer_id,
238                            RemoteParticipant {
239                                user: user.clone(),
240                                project_ids: participant.project_ids,
241                                location: ParticipantLocation::from_proto(participant.location)
242                                    .unwrap_or(ParticipantLocation::External),
243                            },
244                        );
245                    }
246
247                    this.remote_participants.retain(|_, participant| {
248                        this.participant_user_ids.contains(&participant.user.id)
249                    });
250
251                    cx.notify();
252                }
253
254                if let Some(pending_participants) = pending_participants.log_err() {
255                    this.pending_participants = pending_participants;
256                    for participant in &this.pending_participants {
257                        this.participant_user_ids.insert(participant.id);
258                    }
259                    cx.notify();
260                }
261
262                this.pending_room_update.take();
263                if this.should_leave() {
264                    let _ = this.leave(cx);
265                }
266
267                this.check_invariants();
268            });
269        }));
270
271        cx.notify();
272        Ok(())
273    }
274
275    fn check_invariants(&self) {
276        #[cfg(any(test, feature = "test-support"))]
277        {
278            for participant in self.remote_participants.values() {
279                assert!(self.participant_user_ids.contains(&participant.user.id));
280            }
281
282            for participant in &self.pending_participants {
283                assert!(self.participant_user_ids.contains(&participant.id));
284            }
285
286            assert_eq!(
287                self.participant_user_ids.len(),
288                self.remote_participants.len() + self.pending_participants.len()
289            );
290        }
291    }
292
293    pub(crate) fn call(
294        &mut self,
295        recipient_user_id: u64,
296        initial_project_id: Option<u64>,
297        cx: &mut ModelContext<Self>,
298    ) -> Task<Result<()>> {
299        if self.status.is_offline() {
300            return Task::ready(Err(anyhow!("room is offline")));
301        }
302
303        cx.notify();
304        let client = self.client.clone();
305        let room_id = self.id;
306        self.pending_call_count += 1;
307        cx.spawn(|this, mut cx| async move {
308            let result = client
309                .request(proto::Call {
310                    room_id,
311                    recipient_user_id,
312                    initial_project_id,
313                })
314                .await;
315            this.update(&mut cx, |this, cx| {
316                this.pending_call_count -= 1;
317                if this.should_leave() {
318                    this.leave(cx)?;
319                }
320                result
321            })?;
322            Ok(())
323        })
324    }
325
326    pub(crate) fn share_project(
327        &mut self,
328        project: ModelHandle<Project>,
329        cx: &mut ModelContext<Self>,
330    ) -> Task<Result<u64>> {
331        if project.read(cx).is_remote() {
332            return Task::ready(Err(anyhow!("can't share remote project")));
333        } else if let Some(project_id) = project.read(cx).remote_id() {
334            return Task::ready(Ok(project_id));
335        }
336
337        let request = self
338            .client
339            .request(proto::ShareProject { room_id: self.id() });
340        cx.spawn_weak(|_, mut cx| async move {
341            let response = request.await?;
342            project
343                .update(&mut cx, |project, cx| {
344                    project.shared(response.project_id, cx)
345                })
346                .await?;
347            Ok(response.project_id)
348        })
349    }
350
351    pub fn set_location(
352        &mut self,
353        project: Option<&ModelHandle<Project>>,
354        cx: &mut ModelContext<Self>,
355    ) -> Task<Result<()>> {
356        if self.status.is_offline() {
357            return Task::ready(Err(anyhow!("room is offline")));
358        }
359
360        let client = self.client.clone();
361        let room_id = self.id;
362        let location = if let Some(project) = project {
363            if let Some(project_id) = project.read(cx).remote_id() {
364                proto::participant_location::Variant::Project(
365                    proto::participant_location::Project { id: project_id },
366                )
367            } else {
368                return Task::ready(Err(anyhow!("project is not shared")));
369            }
370        } else {
371            proto::participant_location::Variant::External(proto::participant_location::External {})
372        };
373
374        cx.foreground().spawn(async move {
375            client
376                .request(proto::UpdateParticipantLocation {
377                    room_id,
378                    location: Some(proto::ParticipantLocation {
379                        variant: Some(location),
380                    }),
381                })
382                .await?;
383            Ok(())
384        })
385    }
386}
387
388#[derive(Copy, Clone, PartialEq, Eq)]
389pub enum RoomStatus {
390    Online,
391    Offline,
392}
393
394impl RoomStatus {
395    pub fn is_offline(&self) -> bool {
396        matches!(self, RoomStatus::Offline)
397    }
398}