1use crate::{
2 participant::{ParticipantLocation, RemoteParticipant},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use project::Project;
11use std::sync::Arc;
12use util::ResultExt;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum Event {
16 RemoteProjectShared {
17 owner: Arc<User>,
18 project_id: u64,
19 worktree_root_names: Vec<String>,
20 },
21}
22
23pub struct Room {
24 id: u64,
25 status: RoomStatus,
26 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
27 pending_participants: Vec<Arc<User>>,
28 participant_user_ids: HashSet<u64>,
29 pending_call_count: usize,
30 leave_when_empty: bool,
31 client: Arc<Client>,
32 user_store: ModelHandle<UserStore>,
33 subscriptions: Vec<client::Subscription>,
34 pending_room_update: Option<Task<()>>,
35}
36
37impl Entity for Room {
38 type Event = Event;
39
40 fn release(&mut self, _: &mut MutableAppContext) {
41 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
42 }
43}
44
45impl Room {
46 fn new(
47 id: u64,
48 client: Arc<Client>,
49 user_store: ModelHandle<UserStore>,
50 cx: &mut ModelContext<Self>,
51 ) -> Self {
52 let mut client_status = client.status();
53 cx.spawn_weak(|this, mut cx| async move {
54 let is_connected = client_status
55 .next()
56 .await
57 .map_or(false, |s| s.is_connected());
58 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
59 if !is_connected || client_status.next().await.is_some() {
60 if let Some(this) = this.upgrade(&cx) {
61 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
62 }
63 }
64 })
65 .detach();
66
67 Self {
68 id,
69 status: RoomStatus::Online,
70 participant_user_ids: Default::default(),
71 remote_participants: Default::default(),
72 pending_participants: Default::default(),
73 pending_call_count: 0,
74 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
75 leave_when_empty: false,
76 pending_room_update: None,
77 client,
78 user_store,
79 }
80 }
81
82 pub(crate) fn create(
83 recipient_user_id: u64,
84 initial_project: Option<ModelHandle<Project>>,
85 client: Arc<Client>,
86 user_store: ModelHandle<UserStore>,
87 cx: &mut MutableAppContext,
88 ) -> Task<Result<ModelHandle<Self>>> {
89 cx.spawn(|mut cx| async move {
90 let response = client.request(proto::CreateRoom {}).await?;
91 let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
92
93 let initial_project_id = if let Some(initial_project) = initial_project {
94 let initial_project_id = room
95 .update(&mut cx, |room, cx| {
96 room.share_project(initial_project.clone(), cx)
97 })
98 .await?;
99 Some(initial_project_id)
100 } else {
101 None
102 };
103
104 match room
105 .update(&mut cx, |room, cx| {
106 room.leave_when_empty = true;
107 room.call(recipient_user_id, initial_project_id, cx)
108 })
109 .await
110 {
111 Ok(()) => Ok(room),
112 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
113 }
114 })
115 }
116
117 pub(crate) fn join(
118 call: &IncomingCall,
119 client: Arc<Client>,
120 user_store: ModelHandle<UserStore>,
121 cx: &mut MutableAppContext,
122 ) -> Task<Result<ModelHandle<Self>>> {
123 let room_id = call.room_id;
124 cx.spawn(|mut cx| async move {
125 let response = client.request(proto::JoinRoom { id: room_id }).await?;
126 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
127 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
128 room.update(&mut cx, |room, cx| {
129 room.leave_when_empty = true;
130 room.apply_room_update(room_proto, cx)?;
131 anyhow::Ok(())
132 })?;
133 Ok(room)
134 })
135 }
136
137 fn should_leave(&self) -> bool {
138 self.leave_when_empty
139 && self.pending_room_update.is_none()
140 && self.pending_participants.is_empty()
141 && self.remote_participants.is_empty()
142 && self.pending_call_count == 0
143 }
144
145 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
146 if self.status.is_offline() {
147 return Err(anyhow!("room is offline"));
148 }
149
150 cx.notify();
151 self.status = RoomStatus::Offline;
152 self.remote_participants.clear();
153 self.pending_participants.clear();
154 self.participant_user_ids.clear();
155 self.subscriptions.clear();
156 self.client.send(proto::LeaveRoom { id: self.id })?;
157 Ok(())
158 }
159
160 pub fn id(&self) -> u64 {
161 self.id
162 }
163
164 pub fn status(&self) -> RoomStatus {
165 self.status
166 }
167
168 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
169 &self.remote_participants
170 }
171
172 pub fn pending_participants(&self) -> &[Arc<User>] {
173 &self.pending_participants
174 }
175
176 pub fn contains_participant(&self, user_id: u64) -> bool {
177 self.participant_user_ids.contains(&user_id)
178 }
179
180 async fn handle_room_updated(
181 this: ModelHandle<Self>,
182 envelope: TypedEnvelope<proto::RoomUpdated>,
183 _: Arc<Client>,
184 mut cx: AsyncAppContext,
185 ) -> Result<()> {
186 let room = envelope
187 .payload
188 .room
189 .ok_or_else(|| anyhow!("invalid room"))?;
190 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
191 }
192
193 fn apply_room_update(
194 &mut self,
195 mut room: proto::Room,
196 cx: &mut ModelContext<Self>,
197 ) -> Result<()> {
198 // Filter ourselves out from the room's participants.
199 room.participants
200 .retain(|participant| Some(participant.user_id) != self.client.user_id());
201
202 let remote_participant_user_ids = room
203 .participants
204 .iter()
205 .map(|p| p.user_id)
206 .collect::<Vec<_>>();
207 let (remote_participants, pending_participants) =
208 self.user_store.update(cx, move |user_store, cx| {
209 (
210 user_store.get_users(remote_participant_user_ids, cx),
211 user_store.get_users(room.pending_participant_user_ids, cx),
212 )
213 });
214 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
215 let (remote_participants, pending_participants) =
216 futures::join!(remote_participants, pending_participants);
217
218 this.update(&mut cx, |this, cx| {
219 this.participant_user_ids.clear();
220
221 if let Some(participants) = remote_participants.log_err() {
222 for (participant, user) in room.participants.into_iter().zip(participants) {
223 let peer_id = PeerId(participant.peer_id);
224 this.participant_user_ids.insert(participant.user_id);
225
226 let existing_projects = this
227 .remote_participants
228 .get(&peer_id)
229 .into_iter()
230 .flat_map(|existing| &existing.projects)
231 .map(|project| project.id)
232 .collect::<HashSet<_>>();
233 for project in &participant.projects {
234 if !existing_projects.contains(&project.id) {
235 cx.emit(Event::RemoteProjectShared {
236 owner: user.clone(),
237 project_id: project.id,
238 worktree_root_names: project.worktree_root_names.clone(),
239 });
240 }
241 }
242
243 this.remote_participants.insert(
244 peer_id,
245 RemoteParticipant {
246 user: user.clone(),
247 projects: participant.projects,
248 location: ParticipantLocation::from_proto(participant.location)
249 .unwrap_or(ParticipantLocation::External),
250 },
251 );
252 }
253
254 this.remote_participants.retain(|_, participant| {
255 this.participant_user_ids.contains(&participant.user.id)
256 });
257
258 cx.notify();
259 }
260
261 if let Some(pending_participants) = pending_participants.log_err() {
262 this.pending_participants = pending_participants;
263 for participant in &this.pending_participants {
264 this.participant_user_ids.insert(participant.id);
265 }
266 cx.notify();
267 }
268
269 this.pending_room_update.take();
270 if this.should_leave() {
271 let _ = this.leave(cx);
272 }
273
274 this.check_invariants();
275 });
276 }));
277
278 cx.notify();
279 Ok(())
280 }
281
282 fn check_invariants(&self) {
283 #[cfg(any(test, feature = "test-support"))]
284 {
285 for participant in self.remote_participants.values() {
286 assert!(self.participant_user_ids.contains(&participant.user.id));
287 }
288
289 for participant in &self.pending_participants {
290 assert!(self.participant_user_ids.contains(&participant.id));
291 }
292
293 assert_eq!(
294 self.participant_user_ids.len(),
295 self.remote_participants.len() + self.pending_participants.len()
296 );
297 }
298 }
299
300 pub(crate) fn call(
301 &mut self,
302 recipient_user_id: u64,
303 initial_project_id: Option<u64>,
304 cx: &mut ModelContext<Self>,
305 ) -> Task<Result<()>> {
306 if self.status.is_offline() {
307 return Task::ready(Err(anyhow!("room is offline")));
308 }
309
310 cx.notify();
311 let client = self.client.clone();
312 let room_id = self.id;
313 self.pending_call_count += 1;
314 cx.spawn(|this, mut cx| async move {
315 let result = client
316 .request(proto::Call {
317 room_id,
318 recipient_user_id,
319 initial_project_id,
320 })
321 .await;
322 this.update(&mut cx, |this, cx| {
323 this.pending_call_count -= 1;
324 if this.should_leave() {
325 this.leave(cx)?;
326 }
327 result
328 })?;
329 Ok(())
330 })
331 }
332
333 pub(crate) fn share_project(
334 &mut self,
335 project: ModelHandle<Project>,
336 cx: &mut ModelContext<Self>,
337 ) -> Task<Result<u64>> {
338 if project.read(cx).is_remote() {
339 return Task::ready(Err(anyhow!("can't share remote project")));
340 } else if let Some(project_id) = project.read(cx).remote_id() {
341 return Task::ready(Ok(project_id));
342 }
343
344 let request = self.client.request(proto::ShareProject {
345 room_id: self.id(),
346 worktrees: project
347 .read(cx)
348 .worktrees(cx)
349 .map(|worktree| {
350 let worktree = worktree.read(cx);
351 proto::WorktreeMetadata {
352 id: worktree.id().to_proto(),
353 root_name: worktree.root_name().into(),
354 visible: worktree.is_visible(),
355 }
356 })
357 .collect(),
358 });
359 cx.spawn_weak(|_, mut cx| async move {
360 let response = request.await?;
361 project
362 .update(&mut cx, |project, cx| {
363 project.shared(response.project_id, cx)
364 })
365 .await?;
366 Ok(response.project_id)
367 })
368 }
369
370 pub fn set_location(
371 &mut self,
372 project: Option<&ModelHandle<Project>>,
373 cx: &mut ModelContext<Self>,
374 ) -> Task<Result<()>> {
375 if self.status.is_offline() {
376 return Task::ready(Err(anyhow!("room is offline")));
377 }
378
379 let client = self.client.clone();
380 let room_id = self.id;
381 let location = if let Some(project) = project {
382 if let Some(project_id) = project.read(cx).remote_id() {
383 proto::participant_location::Variant::Project(
384 proto::participant_location::Project { id: project_id },
385 )
386 } else {
387 return Task::ready(Err(anyhow!("project is not shared")));
388 }
389 } else {
390 proto::participant_location::Variant::External(proto::participant_location::External {})
391 };
392
393 cx.foreground().spawn(async move {
394 client
395 .request(proto::UpdateParticipantLocation {
396 room_id,
397 location: Some(proto::ParticipantLocation {
398 variant: Some(location),
399 }),
400 })
401 .await?;
402 Ok(())
403 })
404 }
405}
406
407#[derive(Copy, Clone, PartialEq, Eq)]
408pub enum RoomStatus {
409 Online,
410 Offline,
411}
412
413impl RoomStatus {
414 pub fn is_offline(&self) -> bool {
415 matches!(self, RoomStatus::Offline)
416 }
417}