1use crate::{
2 participant::{ParticipantLocation, RemoteParticipant},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{HashMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use project::Project;
11use std::sync::Arc;
12use util::ResultExt;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum Event {
16 RemoteProjectShared { owner: Arc<User>, project_id: u64 },
17}
18
19pub struct Room {
20 id: u64,
21 status: RoomStatus,
22 remote_participants: HashMap<PeerId, RemoteParticipant>,
23 pending_users: Vec<Arc<User>>,
24 pending_call_count: usize,
25 leave_when_empty: bool,
26 client: Arc<Client>,
27 user_store: ModelHandle<UserStore>,
28 subscriptions: Vec<client::Subscription>,
29 pending_room_update: Option<Task<()>>,
30}
31
32impl Entity for Room {
33 type Event = Event;
34
35 fn release(&mut self, _: &mut MutableAppContext) {
36 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
37 }
38}
39
40impl Room {
41 fn new(
42 id: u64,
43 client: Arc<Client>,
44 user_store: ModelHandle<UserStore>,
45 cx: &mut ModelContext<Self>,
46 ) -> Self {
47 let mut client_status = client.status();
48 cx.spawn_weak(|this, mut cx| async move {
49 let is_connected = client_status
50 .next()
51 .await
52 .map_or(false, |s| s.is_connected());
53 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
54 if !is_connected || client_status.next().await.is_some() {
55 if let Some(this) = this.upgrade(&cx) {
56 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
57 }
58 }
59 })
60 .detach();
61
62 Self {
63 id,
64 status: RoomStatus::Online,
65 remote_participants: Default::default(),
66 pending_users: Default::default(),
67 pending_call_count: 0,
68 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
69 leave_when_empty: false,
70 pending_room_update: None,
71 client,
72 user_store,
73 }
74 }
75
76 pub(crate) fn create(
77 recipient_user_id: u64,
78 initial_project: Option<ModelHandle<Project>>,
79 client: Arc<Client>,
80 user_store: ModelHandle<UserStore>,
81 cx: &mut MutableAppContext,
82 ) -> Task<Result<ModelHandle<Self>>> {
83 cx.spawn(|mut cx| async move {
84 let response = client.request(proto::CreateRoom {}).await?;
85 let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
86
87 let initial_project_id = if let Some(initial_project) = initial_project {
88 let initial_project_id = room
89 .update(&mut cx, |room, cx| {
90 room.share_project(initial_project.clone(), cx)
91 })
92 .await?;
93 Some(initial_project_id)
94 } else {
95 None
96 };
97
98 match room
99 .update(&mut cx, |room, cx| {
100 room.leave_when_empty = true;
101 room.call(recipient_user_id, initial_project_id, cx)
102 })
103 .await
104 {
105 Ok(()) => Ok(room),
106 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
107 }
108 })
109 }
110
111 pub(crate) fn join(
112 call: &IncomingCall,
113 client: Arc<Client>,
114 user_store: ModelHandle<UserStore>,
115 cx: &mut MutableAppContext,
116 ) -> Task<Result<ModelHandle<Self>>> {
117 let room_id = call.room_id;
118 cx.spawn(|mut cx| async move {
119 let response = client.request(proto::JoinRoom { id: room_id }).await?;
120 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
121 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
122 room.update(&mut cx, |room, cx| {
123 room.leave_when_empty = true;
124 room.apply_room_update(room_proto, cx)?;
125 anyhow::Ok(())
126 })?;
127 Ok(room)
128 })
129 }
130
131 fn should_leave(&self) -> bool {
132 self.leave_when_empty
133 && self.pending_room_update.is_none()
134 && self.pending_users.is_empty()
135 && self.remote_participants.is_empty()
136 && self.pending_call_count == 0
137 }
138
139 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
140 if self.status.is_offline() {
141 return Err(anyhow!("room is offline"));
142 }
143
144 cx.notify();
145 self.status = RoomStatus::Offline;
146 self.remote_participants.clear();
147 self.subscriptions.clear();
148 self.client.send(proto::LeaveRoom { id: self.id })?;
149 Ok(())
150 }
151
152 pub fn id(&self) -> u64 {
153 self.id
154 }
155
156 pub fn status(&self) -> RoomStatus {
157 self.status
158 }
159
160 pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
161 &self.remote_participants
162 }
163
164 pub fn pending_users(&self) -> &[Arc<User>] {
165 &self.pending_users
166 }
167
168 async fn handle_room_updated(
169 this: ModelHandle<Self>,
170 envelope: TypedEnvelope<proto::RoomUpdated>,
171 _: Arc<Client>,
172 mut cx: AsyncAppContext,
173 ) -> Result<()> {
174 let room = envelope
175 .payload
176 .room
177 .ok_or_else(|| anyhow!("invalid room"))?;
178 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
179 }
180
181 fn apply_room_update(
182 &mut self,
183 mut room: proto::Room,
184 cx: &mut ModelContext<Self>,
185 ) -> Result<()> {
186 // Filter ourselves out from the room's participants.
187 room.participants
188 .retain(|participant| Some(participant.user_id) != self.client.user_id());
189
190 let participant_user_ids = room
191 .participants
192 .iter()
193 .map(|p| p.user_id)
194 .collect::<Vec<_>>();
195 let (participants, pending_users) = self.user_store.update(cx, move |user_store, cx| {
196 (
197 user_store.get_users(participant_user_ids, cx),
198 user_store.get_users(room.pending_user_ids, cx),
199 )
200 });
201 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
202 let (participants, pending_users) = futures::join!(participants, pending_users);
203
204 this.update(&mut cx, |this, cx| {
205 if let Some(participants) = participants.log_err() {
206 let mut seen_participants = HashSet::default();
207
208 for (participant, user) in room.participants.into_iter().zip(participants) {
209 let peer_id = PeerId(participant.peer_id);
210 seen_participants.insert(peer_id);
211
212 let existing_project_ids = this
213 .remote_participants
214 .get(&peer_id)
215 .map(|existing| existing.project_ids.clone())
216 .unwrap_or_default();
217 for project_id in &participant.project_ids {
218 if !existing_project_ids.contains(project_id) {
219 cx.emit(Event::RemoteProjectShared {
220 owner: user.clone(),
221 project_id: *project_id,
222 });
223 }
224 }
225
226 this.remote_participants.insert(
227 peer_id,
228 RemoteParticipant {
229 user: user.clone(),
230 project_ids: participant.project_ids,
231 location: ParticipantLocation::from_proto(participant.location)
232 .unwrap_or(ParticipantLocation::External),
233 },
234 );
235 }
236
237 for participant_peer_id in
238 this.remote_participants.keys().copied().collect::<Vec<_>>()
239 {
240 if !seen_participants.contains(&participant_peer_id) {
241 this.remote_participants.remove(&participant_peer_id);
242 }
243 }
244
245 cx.notify();
246 }
247
248 if let Some(pending_users) = pending_users.log_err() {
249 this.pending_users = pending_users;
250 cx.notify();
251 }
252
253 this.pending_room_update.take();
254 if this.should_leave() {
255 let _ = this.leave(cx);
256 }
257 });
258 }));
259
260 cx.notify();
261 Ok(())
262 }
263
264 pub(crate) fn call(
265 &mut self,
266 recipient_user_id: u64,
267 initial_project_id: Option<u64>,
268 cx: &mut ModelContext<Self>,
269 ) -> Task<Result<()>> {
270 if self.status.is_offline() {
271 return Task::ready(Err(anyhow!("room is offline")));
272 }
273
274 cx.notify();
275 let client = self.client.clone();
276 let room_id = self.id;
277 self.pending_call_count += 1;
278 cx.spawn(|this, mut cx| async move {
279 let result = client
280 .request(proto::Call {
281 room_id,
282 recipient_user_id,
283 initial_project_id,
284 })
285 .await;
286 this.update(&mut cx, |this, cx| {
287 this.pending_call_count -= 1;
288 if this.should_leave() {
289 this.leave(cx)?;
290 }
291 result
292 })?;
293 Ok(())
294 })
295 }
296
297 pub(crate) fn share_project(
298 &mut self,
299 project: ModelHandle<Project>,
300 cx: &mut ModelContext<Self>,
301 ) -> Task<Result<u64>> {
302 if project.read(cx).is_remote() {
303 return Task::ready(Err(anyhow!("can't share remote project")));
304 } else if let Some(project_id) = project.read(cx).remote_id() {
305 return Task::ready(Ok(project_id));
306 }
307
308 let request = self
309 .client
310 .request(proto::ShareProject { room_id: self.id() });
311 cx.spawn_weak(|_, mut cx| async move {
312 let response = request.await?;
313 project
314 .update(&mut cx, |project, cx| {
315 project.shared(response.project_id, cx)
316 })
317 .await?;
318 Ok(response.project_id)
319 })
320 }
321
322 pub fn set_location(
323 &mut self,
324 project: Option<&ModelHandle<Project>>,
325 cx: &mut ModelContext<Self>,
326 ) -> Task<Result<()>> {
327 if self.status.is_offline() {
328 return Task::ready(Err(anyhow!("room is offline")));
329 }
330
331 let client = self.client.clone();
332 let room_id = self.id;
333 let location = if let Some(project) = project {
334 if let Some(project_id) = project.read(cx).remote_id() {
335 proto::participant_location::Variant::Project(
336 proto::participant_location::Project { id: project_id },
337 )
338 } else {
339 return Task::ready(Err(anyhow!("project is not shared")));
340 }
341 } else {
342 proto::participant_location::Variant::External(proto::participant_location::External {})
343 };
344
345 cx.foreground().spawn(async move {
346 client
347 .request(proto::UpdateParticipantLocation {
348 room_id,
349 location: Some(proto::ParticipantLocation {
350 variant: Some(location),
351 }),
352 })
353 .await?;
354 Ok(())
355 })
356 }
357}
358
359#[derive(Copy, Clone, PartialEq, Eq)]
360pub enum RoomStatus {
361 Online,
362 Offline,
363}
364
365impl RoomStatus {
366 pub fn is_offline(&self) -> bool {
367 matches!(self, RoomStatus::Offline)
368 }
369}