room.rs

  1use crate::{
  2    participant::{ParticipantLocation, RemoteParticipant},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{HashMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use project::Project;
 11use std::sync::Arc;
 12use util::ResultExt;
 13
 14#[derive(Clone, Debug, PartialEq, Eq)]
 15pub enum Event {
 16    RemoteProjectShared { owner: Arc<User>, project_id: u64 },
 17}
 18
 19pub struct Room {
 20    id: u64,
 21    status: RoomStatus,
 22    remote_participants: HashMap<PeerId, RemoteParticipant>,
 23    pending_users: Vec<Arc<User>>,
 24    pending_call_count: usize,
 25    leave_when_empty: bool,
 26    client: Arc<Client>,
 27    user_store: ModelHandle<UserStore>,
 28    subscriptions: Vec<client::Subscription>,
 29    pending_room_update: Option<Task<()>>,
 30}
 31
 32impl Entity for Room {
 33    type Event = Event;
 34
 35    fn release(&mut self, _: &mut MutableAppContext) {
 36        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 37    }
 38}
 39
 40impl Room {
 41    fn new(
 42        id: u64,
 43        client: Arc<Client>,
 44        user_store: ModelHandle<UserStore>,
 45        cx: &mut ModelContext<Self>,
 46    ) -> Self {
 47        let mut client_status = client.status();
 48        cx.spawn_weak(|this, mut cx| async move {
 49            let is_connected = client_status
 50                .next()
 51                .await
 52                .map_or(false, |s| s.is_connected());
 53            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 54            if !is_connected || client_status.next().await.is_some() {
 55                if let Some(this) = this.upgrade(&cx) {
 56                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 57                }
 58            }
 59        })
 60        .detach();
 61
 62        Self {
 63            id,
 64            status: RoomStatus::Online,
 65            remote_participants: Default::default(),
 66            pending_users: Default::default(),
 67            pending_call_count: 0,
 68            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 69            leave_when_empty: false,
 70            pending_room_update: None,
 71            client,
 72            user_store,
 73        }
 74    }
 75
 76    pub(crate) fn create(
 77        recipient_user_id: u64,
 78        initial_project: Option<ModelHandle<Project>>,
 79        client: Arc<Client>,
 80        user_store: ModelHandle<UserStore>,
 81        cx: &mut MutableAppContext,
 82    ) -> Task<Result<ModelHandle<Self>>> {
 83        cx.spawn(|mut cx| async move {
 84            let response = client.request(proto::CreateRoom {}).await?;
 85            let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
 86
 87            let initial_project_id = if let Some(initial_project) = initial_project {
 88                let initial_project_id = room
 89                    .update(&mut cx, |room, cx| {
 90                        room.share_project(initial_project.clone(), cx)
 91                    })
 92                    .await?;
 93                Some(initial_project_id)
 94            } else {
 95                None
 96            };
 97
 98            match room
 99                .update(&mut cx, |room, cx| {
100                    room.leave_when_empty = true;
101                    room.call(recipient_user_id, initial_project_id, cx)
102                })
103                .await
104            {
105                Ok(()) => Ok(room),
106                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
107            }
108        })
109    }
110
111    pub(crate) fn join(
112        call: &IncomingCall,
113        client: Arc<Client>,
114        user_store: ModelHandle<UserStore>,
115        cx: &mut MutableAppContext,
116    ) -> Task<Result<ModelHandle<Self>>> {
117        let room_id = call.room_id;
118        cx.spawn(|mut cx| async move {
119            let response = client.request(proto::JoinRoom { id: room_id }).await?;
120            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
121            let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
122            room.update(&mut cx, |room, cx| {
123                room.leave_when_empty = true;
124                room.apply_room_update(room_proto, cx)?;
125                anyhow::Ok(())
126            })?;
127            Ok(room)
128        })
129    }
130
131    fn should_leave(&self) -> bool {
132        self.leave_when_empty
133            && self.pending_room_update.is_none()
134            && self.pending_users.is_empty()
135            && self.remote_participants.is_empty()
136            && self.pending_call_count == 0
137    }
138
139    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
140        if self.status.is_offline() {
141            return Err(anyhow!("room is offline"));
142        }
143
144        cx.notify();
145        self.status = RoomStatus::Offline;
146        self.remote_participants.clear();
147        self.subscriptions.clear();
148        self.client.send(proto::LeaveRoom { id: self.id })?;
149        Ok(())
150    }
151
152    pub fn id(&self) -> u64 {
153        self.id
154    }
155
156    pub fn status(&self) -> RoomStatus {
157        self.status
158    }
159
160    pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
161        &self.remote_participants
162    }
163
164    pub fn pending_users(&self) -> &[Arc<User>] {
165        &self.pending_users
166    }
167
168    async fn handle_room_updated(
169        this: ModelHandle<Self>,
170        envelope: TypedEnvelope<proto::RoomUpdated>,
171        _: Arc<Client>,
172        mut cx: AsyncAppContext,
173    ) -> Result<()> {
174        let room = envelope
175            .payload
176            .room
177            .ok_or_else(|| anyhow!("invalid room"))?;
178        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
179    }
180
181    fn apply_room_update(
182        &mut self,
183        mut room: proto::Room,
184        cx: &mut ModelContext<Self>,
185    ) -> Result<()> {
186        // Filter ourselves out from the room's participants.
187        room.participants
188            .retain(|participant| Some(participant.user_id) != self.client.user_id());
189
190        let participant_user_ids = room
191            .participants
192            .iter()
193            .map(|p| p.user_id)
194            .collect::<Vec<_>>();
195        let (participants, pending_users) = self.user_store.update(cx, move |user_store, cx| {
196            (
197                user_store.get_users(participant_user_ids, cx),
198                user_store.get_users(room.pending_user_ids, cx),
199            )
200        });
201        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
202            let (participants, pending_users) = futures::join!(participants, pending_users);
203
204            this.update(&mut cx, |this, cx| {
205                if let Some(participants) = participants.log_err() {
206                    let mut seen_participants = HashSet::default();
207
208                    for (participant, user) in room.participants.into_iter().zip(participants) {
209                        let peer_id = PeerId(participant.peer_id);
210                        seen_participants.insert(peer_id);
211
212                        let existing_project_ids = this
213                            .remote_participants
214                            .get(&peer_id)
215                            .map(|existing| existing.project_ids.clone())
216                            .unwrap_or_default();
217                        for project_id in &participant.project_ids {
218                            if !existing_project_ids.contains(project_id) {
219                                cx.emit(Event::RemoteProjectShared {
220                                    owner: user.clone(),
221                                    project_id: *project_id,
222                                });
223                            }
224                        }
225
226                        this.remote_participants.insert(
227                            peer_id,
228                            RemoteParticipant {
229                                user: user.clone(),
230                                project_ids: participant.project_ids,
231                                location: ParticipantLocation::from_proto(participant.location)
232                                    .unwrap_or(ParticipantLocation::External),
233                            },
234                        );
235                    }
236
237                    for participant_peer_id in
238                        this.remote_participants.keys().copied().collect::<Vec<_>>()
239                    {
240                        if !seen_participants.contains(&participant_peer_id) {
241                            this.remote_participants.remove(&participant_peer_id);
242                        }
243                    }
244
245                    cx.notify();
246                }
247
248                if let Some(pending_users) = pending_users.log_err() {
249                    this.pending_users = pending_users;
250                    cx.notify();
251                }
252
253                this.pending_room_update.take();
254                if this.should_leave() {
255                    let _ = this.leave(cx);
256                }
257            });
258        }));
259
260        cx.notify();
261        Ok(())
262    }
263
264    pub(crate) fn call(
265        &mut self,
266        recipient_user_id: u64,
267        initial_project_id: Option<u64>,
268        cx: &mut ModelContext<Self>,
269    ) -> Task<Result<()>> {
270        if self.status.is_offline() {
271            return Task::ready(Err(anyhow!("room is offline")));
272        }
273
274        cx.notify();
275        let client = self.client.clone();
276        let room_id = self.id;
277        self.pending_call_count += 1;
278        cx.spawn(|this, mut cx| async move {
279            let result = client
280                .request(proto::Call {
281                    room_id,
282                    recipient_user_id,
283                    initial_project_id,
284                })
285                .await;
286            this.update(&mut cx, |this, cx| {
287                this.pending_call_count -= 1;
288                if this.should_leave() {
289                    this.leave(cx)?;
290                }
291                result
292            })?;
293            Ok(())
294        })
295    }
296
297    pub(crate) fn share_project(
298        &mut self,
299        project: ModelHandle<Project>,
300        cx: &mut ModelContext<Self>,
301    ) -> Task<Result<u64>> {
302        if project.read(cx).is_remote() {
303            return Task::ready(Err(anyhow!("can't share remote project")));
304        } else if let Some(project_id) = project.read(cx).remote_id() {
305            return Task::ready(Ok(project_id));
306        }
307
308        let request = self
309            .client
310            .request(proto::ShareProject { room_id: self.id() });
311        cx.spawn_weak(|_, mut cx| async move {
312            let response = request.await?;
313            project
314                .update(&mut cx, |project, cx| {
315                    project.shared(response.project_id, cx)
316                })
317                .await?;
318            Ok(response.project_id)
319        })
320    }
321
322    pub fn set_location(
323        &mut self,
324        project: Option<&ModelHandle<Project>>,
325        cx: &mut ModelContext<Self>,
326    ) -> Task<Result<()>> {
327        if self.status.is_offline() {
328            return Task::ready(Err(anyhow!("room is offline")));
329        }
330
331        let client = self.client.clone();
332        let room_id = self.id;
333        let location = if let Some(project) = project {
334            if let Some(project_id) = project.read(cx).remote_id() {
335                proto::participant_location::Variant::Project(
336                    proto::participant_location::Project { id: project_id },
337                )
338            } else {
339                return Task::ready(Err(anyhow!("project is not shared")));
340            }
341        } else {
342            proto::participant_location::Variant::External(proto::participant_location::External {})
343        };
344
345        cx.foreground().spawn(async move {
346            client
347                .request(proto::UpdateParticipantLocation {
348                    room_id,
349                    location: Some(proto::ParticipantLocation {
350                        variant: Some(location),
351                    }),
352                })
353                .await?;
354            Ok(())
355        })
356    }
357}
358
359#[derive(Copy, Clone, PartialEq, Eq)]
360pub enum RoomStatus {
361    Online,
362    Offline,
363}
364
365impl RoomStatus {
366    pub fn is_offline(&self) -> bool {
367        matches!(self, RoomStatus::Offline)
368    }
369}