1use crate::{
2 participant::{ParticipantLocation, RemoteParticipant},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{HashMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use project::Project;
11use std::sync::Arc;
12use util::ResultExt;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum Event {
16 RemoteProjectShared { owner: Arc<User>, project_id: u64 },
17}
18
19pub struct Room {
20 id: u64,
21 status: RoomStatus,
22 remote_participants: HashMap<PeerId, RemoteParticipant>,
23 pending_users: Vec<Arc<User>>,
24 client: Arc<Client>,
25 user_store: ModelHandle<UserStore>,
26 _subscriptions: Vec<client::Subscription>,
27 _pending_room_update: Option<Task<()>>,
28}
29
30impl Entity for Room {
31 type Event = Event;
32
33 fn release(&mut self, _: &mut MutableAppContext) {
34 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
35 }
36}
37
38impl Room {
39 fn new(
40 id: u64,
41 client: Arc<Client>,
42 user_store: ModelHandle<UserStore>,
43 cx: &mut ModelContext<Self>,
44 ) -> Self {
45 let mut client_status = client.status();
46 cx.spawn_weak(|this, mut cx| async move {
47 let is_connected = client_status
48 .next()
49 .await
50 .map_or(false, |s| s.is_connected());
51 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
52 if !is_connected || client_status.next().await.is_some() {
53 if let Some(this) = this.upgrade(&cx) {
54 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
55 }
56 }
57 })
58 .detach();
59
60 Self {
61 id,
62 status: RoomStatus::Online,
63 remote_participants: Default::default(),
64 pending_users: Default::default(),
65 _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
66 _pending_room_update: None,
67 client,
68 user_store,
69 }
70 }
71
72 pub(crate) fn create(
73 client: Arc<Client>,
74 user_store: ModelHandle<UserStore>,
75 cx: &mut MutableAppContext,
76 ) -> Task<Result<ModelHandle<Self>>> {
77 cx.spawn(|mut cx| async move {
78 let room = client.request(proto::CreateRoom {}).await?;
79 Ok(cx.add_model(|cx| Self::new(room.id, client, user_store, cx)))
80 })
81 }
82
83 pub(crate) fn join(
84 call: &IncomingCall,
85 client: Arc<Client>,
86 user_store: ModelHandle<UserStore>,
87 cx: &mut MutableAppContext,
88 ) -> Task<Result<ModelHandle<Self>>> {
89 let room_id = call.room_id;
90 cx.spawn(|mut cx| async move {
91 let response = client.request(proto::JoinRoom { id: room_id }).await?;
92 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
93 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
94 room.update(&mut cx, |room, cx| room.apply_room_update(room_proto, cx))?;
95 Ok(room)
96 })
97 }
98
99 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
100 if self.status.is_offline() {
101 return Err(anyhow!("room is offline"));
102 }
103
104 cx.notify();
105 self.status = RoomStatus::Offline;
106 self.remote_participants.clear();
107 self.client.send(proto::LeaveRoom { id: self.id })?;
108 Ok(())
109 }
110
111 pub fn id(&self) -> u64 {
112 self.id
113 }
114
115 pub fn status(&self) -> RoomStatus {
116 self.status
117 }
118
119 pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
120 &self.remote_participants
121 }
122
123 pub fn pending_users(&self) -> &[Arc<User>] {
124 &self.pending_users
125 }
126
127 async fn handle_room_updated(
128 this: ModelHandle<Self>,
129 envelope: TypedEnvelope<proto::RoomUpdated>,
130 _: Arc<Client>,
131 mut cx: AsyncAppContext,
132 ) -> Result<()> {
133 let room = envelope
134 .payload
135 .room
136 .ok_or_else(|| anyhow!("invalid room"))?;
137 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?;
138 Ok(())
139 }
140
141 fn apply_room_update(
142 &mut self,
143 mut room: proto::Room,
144 cx: &mut ModelContext<Self>,
145 ) -> Result<()> {
146 // Filter ourselves out from the room's participants.
147 room.participants
148 .retain(|participant| Some(participant.user_id) != self.client.user_id());
149
150 let participant_user_ids = room
151 .participants
152 .iter()
153 .map(|p| p.user_id)
154 .collect::<Vec<_>>();
155 let (participants, pending_users) = self.user_store.update(cx, move |user_store, cx| {
156 (
157 user_store.get_users(participant_user_ids, cx),
158 user_store.get_users(room.pending_user_ids, cx),
159 )
160 });
161 self._pending_room_update = Some(cx.spawn(|this, mut cx| async move {
162 let (participants, pending_users) = futures::join!(participants, pending_users);
163
164 this.update(&mut cx, |this, cx| {
165 if let Some(participants) = participants.log_err() {
166 let mut seen_participants = HashSet::default();
167
168 for (participant, user) in room.participants.into_iter().zip(participants) {
169 let peer_id = PeerId(participant.peer_id);
170 seen_participants.insert(peer_id);
171
172 let existing_project_ids = this
173 .remote_participants
174 .get(&peer_id)
175 .map(|existing| existing.project_ids.clone())
176 .unwrap_or_default();
177 for project_id in &participant.project_ids {
178 if !existing_project_ids.contains(project_id) {
179 cx.emit(Event::RemoteProjectShared {
180 owner: user.clone(),
181 project_id: *project_id,
182 });
183 }
184 }
185
186 this.remote_participants.insert(
187 peer_id,
188 RemoteParticipant {
189 user: user.clone(),
190 project_ids: participant.project_ids,
191 location: ParticipantLocation::from_proto(participant.location)
192 .unwrap_or(ParticipantLocation::External),
193 },
194 );
195 }
196
197 for participant_peer_id in
198 this.remote_participants.keys().copied().collect::<Vec<_>>()
199 {
200 if !seen_participants.contains(&participant_peer_id) {
201 this.remote_participants.remove(&participant_peer_id);
202 }
203 }
204
205 cx.notify();
206 }
207
208 if let Some(pending_users) = pending_users.log_err() {
209 this.pending_users = pending_users;
210 cx.notify();
211 }
212 });
213 }));
214
215 cx.notify();
216 Ok(())
217 }
218
219 pub(crate) fn call(
220 &mut self,
221 recipient_user_id: u64,
222 initial_project_id: Option<u64>,
223 cx: &mut ModelContext<Self>,
224 ) -> Task<Result<()>> {
225 if self.status.is_offline() {
226 return Task::ready(Err(anyhow!("room is offline")));
227 }
228
229 let client = self.client.clone();
230 let room_id = self.id;
231 cx.foreground().spawn(async move {
232 client
233 .request(proto::Call {
234 room_id,
235 recipient_user_id,
236 initial_project_id,
237 })
238 .await?;
239 Ok(())
240 })
241 }
242
243 pub(crate) fn share_project(
244 &mut self,
245 project: ModelHandle<Project>,
246 cx: &mut ModelContext<Self>,
247 ) -> Task<Result<u64>> {
248 let request = self
249 .client
250 .request(proto::ShareProject { room_id: self.id() });
251 cx.spawn_weak(|_, mut cx| async move {
252 let response = request.await?;
253 project
254 .update(&mut cx, |project, cx| {
255 project.shared(response.project_id, cx)
256 })
257 .await?;
258 Ok(response.project_id)
259 })
260 }
261
262 pub fn set_location(
263 &mut self,
264 project: Option<&ModelHandle<Project>>,
265 cx: &mut ModelContext<Self>,
266 ) -> Task<Result<()>> {
267 if self.status.is_offline() {
268 return Task::ready(Err(anyhow!("room is offline")));
269 }
270
271 let client = self.client.clone();
272 let room_id = self.id;
273 let location = if let Some(project) = project {
274 if let Some(project_id) = project.read(cx).remote_id() {
275 proto::participant_location::Variant::Project(
276 proto::participant_location::Project { id: project_id },
277 )
278 } else {
279 return Task::ready(Err(anyhow!("project is not shared")));
280 }
281 } else {
282 proto::participant_location::Variant::External(proto::participant_location::External {})
283 };
284
285 cx.foreground().spawn(async move {
286 client
287 .request(proto::UpdateParticipantLocation {
288 room_id,
289 location: Some(proto::ParticipantLocation {
290 variant: Some(location),
291 }),
292 })
293 .await?;
294 Ok(())
295 })
296 }
297}
298
299#[derive(Copy, Clone, PartialEq, Eq)]
300pub enum RoomStatus {
301 Online,
302 Offline,
303}
304
305impl RoomStatus {
306 pub fn is_offline(&self) -> bool {
307 matches!(self, RoomStatus::Offline)
308 }
309}