room.rs

  1use crate::{
  2    participant::{ParticipantLocation, RemoteParticipant},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{HashMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use project::Project;
 11use std::sync::Arc;
 12use util::ResultExt;
 13
 14#[derive(Clone, Debug, PartialEq, Eq)]
 15pub enum Event {
 16    RemoteProjectShared { owner: Arc<User>, project_id: u64 },
 17}
 18
 19pub struct Room {
 20    id: u64,
 21    status: RoomStatus,
 22    remote_participants: HashMap<PeerId, RemoteParticipant>,
 23    pending_users: Vec<Arc<User>>,
 24    client: Arc<Client>,
 25    user_store: ModelHandle<UserStore>,
 26    _subscriptions: Vec<client::Subscription>,
 27    _pending_room_update: Option<Task<()>>,
 28}
 29
 30impl Entity for Room {
 31    type Event = Event;
 32
 33    fn release(&mut self, _: &mut MutableAppContext) {
 34        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 35    }
 36}
 37
 38impl Room {
 39    fn new(
 40        id: u64,
 41        client: Arc<Client>,
 42        user_store: ModelHandle<UserStore>,
 43        cx: &mut ModelContext<Self>,
 44    ) -> Self {
 45        let mut client_status = client.status();
 46        cx.spawn_weak(|this, mut cx| async move {
 47            let is_connected = client_status
 48                .next()
 49                .await
 50                .map_or(false, |s| s.is_connected());
 51            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 52            if !is_connected || client_status.next().await.is_some() {
 53                if let Some(this) = this.upgrade(&cx) {
 54                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 55                }
 56            }
 57        })
 58        .detach();
 59
 60        Self {
 61            id,
 62            status: RoomStatus::Online,
 63            remote_participants: Default::default(),
 64            pending_users: Default::default(),
 65            _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 66            _pending_room_update: None,
 67            client,
 68            user_store,
 69        }
 70    }
 71
 72    pub(crate) fn create(
 73        client: Arc<Client>,
 74        user_store: ModelHandle<UserStore>,
 75        cx: &mut MutableAppContext,
 76    ) -> Task<Result<ModelHandle<Self>>> {
 77        cx.spawn(|mut cx| async move {
 78            let room = client.request(proto::CreateRoom {}).await?;
 79            Ok(cx.add_model(|cx| Self::new(room.id, client, user_store, cx)))
 80        })
 81    }
 82
 83    pub(crate) fn join(
 84        call: &IncomingCall,
 85        client: Arc<Client>,
 86        user_store: ModelHandle<UserStore>,
 87        cx: &mut MutableAppContext,
 88    ) -> Task<Result<ModelHandle<Self>>> {
 89        let room_id = call.room_id;
 90        cx.spawn(|mut cx| async move {
 91            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 92            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 93            let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
 94            room.update(&mut cx, |room, cx| room.apply_room_update(room_proto, cx))?;
 95            Ok(room)
 96        })
 97    }
 98
 99    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
100        if self.status.is_offline() {
101            return Err(anyhow!("room is offline"));
102        }
103
104        cx.notify();
105        self.status = RoomStatus::Offline;
106        self.remote_participants.clear();
107        self.client.send(proto::LeaveRoom { id: self.id })?;
108        Ok(())
109    }
110
111    pub fn id(&self) -> u64 {
112        self.id
113    }
114
115    pub fn status(&self) -> RoomStatus {
116        self.status
117    }
118
119    pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
120        &self.remote_participants
121    }
122
123    pub fn pending_users(&self) -> &[Arc<User>] {
124        &self.pending_users
125    }
126
127    async fn handle_room_updated(
128        this: ModelHandle<Self>,
129        envelope: TypedEnvelope<proto::RoomUpdated>,
130        _: Arc<Client>,
131        mut cx: AsyncAppContext,
132    ) -> Result<()> {
133        let room = envelope
134            .payload
135            .room
136            .ok_or_else(|| anyhow!("invalid room"))?;
137        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?;
138        Ok(())
139    }
140
141    fn apply_room_update(
142        &mut self,
143        mut room: proto::Room,
144        cx: &mut ModelContext<Self>,
145    ) -> Result<()> {
146        // Filter ourselves out from the room's participants.
147        room.participants
148            .retain(|participant| Some(participant.user_id) != self.client.user_id());
149
150        let participant_user_ids = room
151            .participants
152            .iter()
153            .map(|p| p.user_id)
154            .collect::<Vec<_>>();
155        let (participants, pending_users) = self.user_store.update(cx, move |user_store, cx| {
156            (
157                user_store.get_users(participant_user_ids, cx),
158                user_store.get_users(room.pending_user_ids, cx),
159            )
160        });
161        self._pending_room_update = Some(cx.spawn(|this, mut cx| async move {
162            let (participants, pending_users) = futures::join!(participants, pending_users);
163
164            this.update(&mut cx, |this, cx| {
165                if let Some(participants) = participants.log_err() {
166                    let mut seen_participants = HashSet::default();
167
168                    for (participant, user) in room.participants.into_iter().zip(participants) {
169                        let peer_id = PeerId(participant.peer_id);
170                        seen_participants.insert(peer_id);
171
172                        let existing_project_ids = this
173                            .remote_participants
174                            .get(&peer_id)
175                            .map(|existing| existing.project_ids.clone())
176                            .unwrap_or_default();
177                        for project_id in &participant.project_ids {
178                            if !existing_project_ids.contains(project_id) {
179                                cx.emit(Event::RemoteProjectShared {
180                                    owner: user.clone(),
181                                    project_id: *project_id,
182                                });
183                            }
184                        }
185
186                        this.remote_participants.insert(
187                            peer_id,
188                            RemoteParticipant {
189                                user: user.clone(),
190                                project_ids: participant.project_ids,
191                                location: ParticipantLocation::from_proto(participant.location)
192                                    .unwrap_or(ParticipantLocation::External),
193                            },
194                        );
195                    }
196
197                    for participant_peer_id in
198                        this.remote_participants.keys().copied().collect::<Vec<_>>()
199                    {
200                        if !seen_participants.contains(&participant_peer_id) {
201                            this.remote_participants.remove(&participant_peer_id);
202                        }
203                    }
204
205                    cx.notify();
206                }
207
208                if let Some(pending_users) = pending_users.log_err() {
209                    this.pending_users = pending_users;
210                    cx.notify();
211                }
212            });
213        }));
214
215        cx.notify();
216        Ok(())
217    }
218
219    pub(crate) fn call(
220        &mut self,
221        recipient_user_id: u64,
222        initial_project_id: Option<u64>,
223        cx: &mut ModelContext<Self>,
224    ) -> Task<Result<()>> {
225        if self.status.is_offline() {
226            return Task::ready(Err(anyhow!("room is offline")));
227        }
228
229        let client = self.client.clone();
230        let room_id = self.id;
231        cx.foreground().spawn(async move {
232            client
233                .request(proto::Call {
234                    room_id,
235                    recipient_user_id,
236                    initial_project_id,
237                })
238                .await?;
239            Ok(())
240        })
241    }
242
243    pub(crate) fn share_project(
244        &mut self,
245        project: ModelHandle<Project>,
246        cx: &mut ModelContext<Self>,
247    ) -> Task<Result<u64>> {
248        let request = self
249            .client
250            .request(proto::ShareProject { room_id: self.id() });
251        cx.spawn_weak(|_, mut cx| async move {
252            let response = request.await?;
253            project
254                .update(&mut cx, |project, cx| {
255                    project.shared(response.project_id, cx)
256                })
257                .await?;
258            Ok(response.project_id)
259        })
260    }
261
262    pub fn set_location(
263        &mut self,
264        project: Option<&ModelHandle<Project>>,
265        cx: &mut ModelContext<Self>,
266    ) -> Task<Result<()>> {
267        if self.status.is_offline() {
268            return Task::ready(Err(anyhow!("room is offline")));
269        }
270
271        let client = self.client.clone();
272        let room_id = self.id;
273        let location = if let Some(project) = project {
274            if let Some(project_id) = project.read(cx).remote_id() {
275                proto::participant_location::Variant::Project(
276                    proto::participant_location::Project { id: project_id },
277                )
278            } else {
279                return Task::ready(Err(anyhow!("project is not shared")));
280            }
281        } else {
282            proto::participant_location::Variant::External(proto::participant_location::External {})
283        };
284
285        cx.foreground().spawn(async move {
286            client
287                .request(proto::UpdateParticipantLocation {
288                    room_id,
289                    location: Some(proto::ParticipantLocation {
290                        variant: Some(location),
291                    }),
292                })
293                .await?;
294            Ok(())
295        })
296    }
297}
298
299#[derive(Copy, Clone, PartialEq, Eq)]
300pub enum RoomStatus {
301    Online,
302    Offline,
303}
304
305impl RoomStatus {
306    pub fn is_offline(&self) -> bool {
307        matches!(self, RoomStatus::Offline)
308    }
309}