room.rs

  1use crate::participant::{ParticipantLocation, RemoteParticipant};
  2use anyhow::{anyhow, Result};
  3use client::{incoming_call::IncomingCall, proto, Client, PeerId, TypedEnvelope, User, UserStore};
  4use collections::{HashMap, HashSet};
  5use futures::StreamExt;
  6use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
  7use project::Project;
  8use std::sync::Arc;
  9use util::ResultExt;
 10
 11#[derive(Clone, Debug, PartialEq, Eq)]
 12pub enum Event {
 13    RemoteProjectShared { owner: Arc<User>, project_id: u64 },
 14}
 15
 16pub struct Room {
 17    id: u64,
 18    status: RoomStatus,
 19    remote_participants: HashMap<PeerId, RemoteParticipant>,
 20    pending_users: Vec<Arc<User>>,
 21    client: Arc<Client>,
 22    user_store: ModelHandle<UserStore>,
 23    _subscriptions: Vec<client::Subscription>,
 24    _pending_room_update: Option<Task<()>>,
 25}
 26
 27impl Entity for Room {
 28    type Event = Event;
 29
 30    fn release(&mut self, _: &mut MutableAppContext) {
 31        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 32    }
 33}
 34
 35impl Room {
 36    fn new(
 37        id: u64,
 38        client: Arc<Client>,
 39        user_store: ModelHandle<UserStore>,
 40        cx: &mut ModelContext<Self>,
 41    ) -> Self {
 42        let mut client_status = client.status();
 43        cx.spawn_weak(|this, mut cx| async move {
 44            let is_connected = client_status
 45                .next()
 46                .await
 47                .map_or(false, |s| s.is_connected());
 48            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 49            if !is_connected || client_status.next().await.is_some() {
 50                if let Some(this) = this.upgrade(&cx) {
 51                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 52                }
 53            }
 54        })
 55        .detach();
 56
 57        Self {
 58            id,
 59            status: RoomStatus::Online,
 60            remote_participants: Default::default(),
 61            pending_users: Default::default(),
 62            _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 63            _pending_room_update: None,
 64            client,
 65            user_store,
 66        }
 67    }
 68
 69    pub fn create(
 70        client: Arc<Client>,
 71        user_store: ModelHandle<UserStore>,
 72        cx: &mut MutableAppContext,
 73    ) -> Task<Result<ModelHandle<Self>>> {
 74        cx.spawn(|mut cx| async move {
 75            let room = client.request(proto::CreateRoom {}).await?;
 76            Ok(cx.add_model(|cx| Self::new(room.id, client, user_store, cx)))
 77        })
 78    }
 79
 80    pub fn join(
 81        call: &IncomingCall,
 82        client: Arc<Client>,
 83        user_store: ModelHandle<UserStore>,
 84        cx: &mut MutableAppContext,
 85    ) -> Task<Result<ModelHandle<Self>>> {
 86        let room_id = call.room_id;
 87        cx.spawn(|mut cx| async move {
 88            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 89            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 90            let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
 91            room.update(&mut cx, |room, cx| room.apply_room_update(room_proto, cx))?;
 92            Ok(room)
 93        })
 94    }
 95
 96    pub fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 97        if self.status.is_offline() {
 98            return Err(anyhow!("room is offline"));
 99        }
100
101        cx.notify();
102        self.status = RoomStatus::Offline;
103        self.remote_participants.clear();
104        self.client.send(proto::LeaveRoom { id: self.id })?;
105        Ok(())
106    }
107
108    pub fn id(&self) -> u64 {
109        self.id
110    }
111
112    pub fn status(&self) -> RoomStatus {
113        self.status
114    }
115
116    pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
117        &self.remote_participants
118    }
119
120    pub fn pending_users(&self) -> &[Arc<User>] {
121        &self.pending_users
122    }
123
124    async fn handle_room_updated(
125        this: ModelHandle<Self>,
126        envelope: TypedEnvelope<proto::RoomUpdated>,
127        _: Arc<Client>,
128        mut cx: AsyncAppContext,
129    ) -> Result<()> {
130        let room = envelope
131            .payload
132            .room
133            .ok_or_else(|| anyhow!("invalid room"))?;
134        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?;
135        Ok(())
136    }
137
138    fn apply_room_update(
139        &mut self,
140        mut room: proto::Room,
141        cx: &mut ModelContext<Self>,
142    ) -> Result<()> {
143        // Filter ourselves out from the room's participants.
144        room.participants
145            .retain(|participant| Some(participant.user_id) != self.client.user_id());
146
147        let participant_user_ids = room
148            .participants
149            .iter()
150            .map(|p| p.user_id)
151            .collect::<Vec<_>>();
152        let (participants, pending_users) = self.user_store.update(cx, move |user_store, cx| {
153            (
154                user_store.get_users(participant_user_ids, cx),
155                user_store.get_users(room.pending_user_ids, cx),
156            )
157        });
158        self._pending_room_update = Some(cx.spawn(|this, mut cx| async move {
159            let (participants, pending_users) = futures::join!(participants, pending_users);
160
161            this.update(&mut cx, |this, cx| {
162                if let Some(participants) = participants.log_err() {
163                    let mut seen_participants = HashSet::default();
164
165                    for (participant, user) in room.participants.into_iter().zip(participants) {
166                        let peer_id = PeerId(participant.peer_id);
167                        seen_participants.insert(peer_id);
168
169                        let existing_project_ids = this
170                            .remote_participants
171                            .get(&peer_id)
172                            .map(|existing| existing.project_ids.clone())
173                            .unwrap_or_default();
174                        for project_id in &participant.project_ids {
175                            if !existing_project_ids.contains(project_id) {
176                                cx.emit(Event::RemoteProjectShared {
177                                    owner: user.clone(),
178                                    project_id: *project_id,
179                                });
180                            }
181                        }
182
183                        this.remote_participants.insert(
184                            peer_id,
185                            RemoteParticipant {
186                                user: user.clone(),
187                                project_ids: participant.project_ids,
188                                location: ParticipantLocation::from_proto(participant.location)
189                                    .unwrap_or(ParticipantLocation::External),
190                            },
191                        );
192                    }
193
194                    for participant_peer_id in
195                        this.remote_participants.keys().copied().collect::<Vec<_>>()
196                    {
197                        if !seen_participants.contains(&participant_peer_id) {
198                            this.remote_participants.remove(&participant_peer_id);
199                        }
200                    }
201
202                    cx.notify();
203                }
204
205                if let Some(pending_users) = pending_users.log_err() {
206                    this.pending_users = pending_users;
207                    cx.notify();
208                }
209            });
210        }));
211
212        cx.notify();
213        Ok(())
214    }
215
216    pub fn call(
217        &mut self,
218        recipient_user_id: u64,
219        initial_project_id: Option<u64>,
220        cx: &mut ModelContext<Self>,
221    ) -> Task<Result<()>> {
222        if self.status.is_offline() {
223            return Task::ready(Err(anyhow!("room is offline")));
224        }
225
226        let client = self.client.clone();
227        let room_id = self.id;
228        cx.foreground().spawn(async move {
229            client
230                .request(proto::Call {
231                    room_id,
232                    recipient_user_id,
233                    initial_project_id,
234                })
235                .await?;
236            Ok(())
237        })
238    }
239
240    pub fn set_location(
241        &mut self,
242        project: Option<&ModelHandle<Project>>,
243        cx: &mut ModelContext<Self>,
244    ) -> Task<Result<()>> {
245        if self.status.is_offline() {
246            return Task::ready(Err(anyhow!("room is offline")));
247        }
248
249        let client = self.client.clone();
250        let room_id = self.id;
251        let location = if let Some(project) = project {
252            if let Some(project_id) = project.read(cx).remote_id() {
253                proto::participant_location::Variant::Project(
254                    proto::participant_location::Project { id: project_id },
255                )
256            } else {
257                return Task::ready(Err(anyhow!("project is not shared")));
258            }
259        } else {
260            proto::participant_location::Variant::External(proto::participant_location::External {})
261        };
262
263        cx.foreground().spawn(async move {
264            client
265                .request(proto::UpdateParticipantLocation {
266                    room_id,
267                    location: Some(proto::ParticipantLocation {
268                        variant: Some(location),
269                    }),
270                })
271                .await?;
272            Ok(())
273        })
274    }
275}
276
277#[derive(Copy, Clone, PartialEq, Eq)]
278pub enum RoomStatus {
279    Online,
280    Offline,
281}
282
283impl RoomStatus {
284    pub fn is_offline(&self) -> bool {
285        matches!(self, RoomStatus::Offline)
286    }
287}